asfgit closed pull request #6389: [FLINK-9917][JM] Remove superfluous lock from 
SlotSharingManager
URL: https://github.com/apache/flink/pull/6389
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index afcd24f1064..ef288a26469 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -36,7 +36,6 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
 
 import java.util.AbstractCollection;
 import java.util.Collection;
@@ -82,9 +81,6 @@
 
        private static final Logger LOG = 
LoggerFactory.getLogger(SlotSharingManager.class);
 
-       /** Lock for the internal data structures. */
-       private final Object lock = new Object();
-
        private final SlotSharingGroupId slotSharingGroupId;
 
        /** Actions to release allocated slots after a complete multi task slot 
hierarchy has been released. */
@@ -96,11 +92,9 @@
        private final Map<SlotRequestId, TaskSlot> allTaskSlots;
 
        /** Root nodes which have not been completed because the allocated slot 
is still pending. */
-       @GuardedBy("lock")
        private final Map<SlotRequestId, MultiTaskSlot> unresolvedRootSlots;
 
        /** Root nodes which have been completed (the underlying allocated slot 
has been assigned). */
-       @GuardedBy("lock")
        private final Map<TaskManagerLocation, Set<MultiTaskSlot>> 
resolvedRootSlots;
 
        SlotSharingManager(
@@ -152,27 +146,23 @@ MultiTaskSlot createRootSlot(
 
                allTaskSlots.put(slotRequestId, rootMultiTaskSlot);
 
-               synchronized (lock) {
-                       unresolvedRootSlots.put(slotRequestId, 
rootMultiTaskSlot);
-               }
+               unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);
 
                // add the root node to the set of resolved root nodes once the 
SlotContext future has
                // been completed and we know the slot's TaskManagerLocation
                slotContextFuture.whenComplete(
                        (SlotContext slotContext, Throwable throwable) -> {
                                if (slotContext != null) {
-                                       synchronized (lock) {
-                                               final MultiTaskSlot 
resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);
+                                       final MultiTaskSlot resolvedRootNode = 
unresolvedRootSlots.remove(slotRequestId);
 
-                                               if (resolvedRootNode != null) {
-                                                       LOG.trace("Fulfill 
multi task slot [{}] with slot [{}].", slotRequestId, 
slotContext.getAllocationId());
+                                       if (resolvedRootNode != null) {
+                                               LOG.trace("Fulfill multi task 
slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId());
 
-                                                       final 
Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent(
-                                                               
slotContext.getTaskManagerLocation(),
-                                                               
taskManagerLocation -> new HashSet<>(4));
+                                               final Set<MultiTaskSlot> 
innerCollection = resolvedRootSlots.computeIfAbsent(
+                                                       
slotContext.getTaskManagerLocation(),
+                                                       taskManagerLocation -> 
new HashSet<>(4));
 
-                                                       
innerCollection.add(resolvedRootNode);
-                                               }
+                                               
innerCollection.add(resolvedRootNode);
                                        }
                                } else {
                                        rootMultiTaskSlot.release(throwable);
@@ -193,15 +183,13 @@ MultiTaskSlot createRootSlot(
         */
        @Nullable
        MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, 
SchedulingStrategy matcher, SlotProfile slotProfile) {
-               synchronized (lock) {
-                       Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues 
= this.resolvedRootSlots.values();
-                       return matcher.findMatchWithLocality(
-                               slotProfile,
-                               
resolvedRootSlotsValues.stream().flatMap(Collection::stream),
-                               (MultiTaskSlot multiTaskSlot) -> 
multiTaskSlot.getSlotContextFuture().join(),
-                               (MultiTaskSlot multiTaskSlot) -> 
!multiTaskSlot.contains(groupId),
-                               MultiTaskSlotLocality::of);
-               }
+               Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues = 
this.resolvedRootSlots.values();
+               return matcher.findMatchWithLocality(
+                       slotProfile,
+                       
resolvedRootSlotsValues.stream().flatMap(Collection::stream),
+                       (MultiTaskSlot multiTaskSlot) -> 
multiTaskSlot.getSlotContextFuture().join(),
+                       (MultiTaskSlot multiTaskSlot) -> 
!multiTaskSlot.contains(groupId),
+                       MultiTaskSlotLocality::of);
        }
 
        /**
@@ -213,11 +201,9 @@ MultiTaskSlotLocality getResolvedRootSlot(AbstractID 
groupId, SchedulingStrategy
         */
        @Nullable
        MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
-               synchronized (lock) {
-                       for (MultiTaskSlot multiTaskSlot : 
unresolvedRootSlots.values()) {
-                               if (!multiTaskSlot.contains(groupId)) {
-                                       return multiTaskSlot;
-                               }
+               for (MultiTaskSlot multiTaskSlot : 
unresolvedRootSlots.values()) {
+                       if (!multiTaskSlot.contains(groupId)) {
+                               return multiTaskSlot;
                        }
                }
 
@@ -228,11 +214,9 @@ MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
        public String toString() {
                final StringBuilder builder = new 
StringBuilder("{\n\tgroupId=").append(slotSharingGroupId).append('\n');
 
-               synchronized (lock) {
-                       
builder.append("\tunresolved=").append(unresolvedRootSlots).append('\n');
-                       
builder.append("\tresolved=").append(resolvedRootSlots).append('\n');
-                       
builder.append("\tall=").append(allTaskSlots).append('\n');
-               }
+               
builder.append("\tunresolved=").append(unresolvedRootSlots).append('\n');
+               
builder.append("\tresolved=").append(resolvedRootSlots).append('\n');
+               builder.append("\tall=").append(allTaskSlots).append('\n');
 
                return builder.append('}').toString();
        }
@@ -479,26 +463,20 @@ public void release(Throwable cause) {
                                parent.releaseChild(getGroupId());
                        } else if (allTaskSlots.remove(getSlotRequestId()) != 
null) {
                                // we are the root node --> remove the root 
node from the list of task slots
+                               final MultiTaskSlot unresolvedRootSlot = 
unresolvedRootSlots.remove(getSlotRequestId());
 
-                               if (!slotContextFuture.isDone() || 
slotContextFuture.isCompletedExceptionally()) {
-                                       synchronized (lock) {
-                                               // the root node should still 
be unresolved
-                                               
unresolvedRootSlots.remove(getSlotRequestId());
-                                       }
-                               } else {
+                               if (unresolvedRootSlot == null) {
                                        // the root node should be resolved --> 
we can access the slot context
                                        final SlotContext slotContext = 
slotContextFuture.getNow(null);
 
                                        if (slotContext != null) {
-                                               synchronized (lock) {
-                                                       final 
Set<MultiTaskSlot> multiTaskSlots = 
resolvedRootSlots.get(slotContext.getTaskManagerLocation());
+                                               final Set<MultiTaskSlot> 
multiTaskSlots = resolvedRootSlots.get(slotContext.getTaskManagerLocation());
 
-                                                       if (multiTaskSlots != 
null) {
-                                                               
multiTaskSlots.remove(this);
+                                               if (multiTaskSlots != null) {
+                                                       
multiTaskSlots.remove(this);
 
-                                                               if 
(multiTaskSlots.isEmpty()) {
-                                                                       
resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
-                                                               }
+                                                       if 
(multiTaskSlots.isEmpty()) {
+                                                               
resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
                                                        }
                                                }
                                        }
@@ -637,9 +615,7 @@ public String toString() {
 
        @VisibleForTesting
        Collection<MultiTaskSlot> getUnresolvedRootSlots() {
-               synchronized (lock) {
-                       return unresolvedRootSlots.values();
-               }
+               return unresolvedRootSlots.values();
        }
 
        /**
@@ -649,19 +625,15 @@ public String toString() {
 
                @Override
                public Iterator<MultiTaskSlot> iterator() {
-                       synchronized (lock) {
-                               return new 
ResolvedRootSlotIterator(resolvedRootSlots.values().iterator());
-                       }
+                       return new 
ResolvedRootSlotIterator(resolvedRootSlots.values().iterator());
                }
 
                @Override
                public int size() {
                        int numberResolvedMultiTaskSlots = 0;
 
-                       synchronized (lock) {
-                               for (Set<MultiTaskSlot> multiTaskSlots : 
resolvedRootSlots.values()) {
-                                       numberResolvedMultiTaskSlots += 
multiTaskSlots.size();
-                               }
+                       for (Set<MultiTaskSlot> multiTaskSlots : 
resolvedRootSlots.values()) {
+                               numberResolvedMultiTaskSlots += 
multiTaskSlots.size();
                        }
 
                        return numberResolvedMultiTaskSlots;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to