asfgit closed pull request #6389: [FLINK-9917][JM] Remove superfluous lock from SlotSharingManager URL: https://github.com/apache/flink/pull/6389
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java index afcd24f1064..ef288a26469 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java @@ -36,7 +36,6 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; import java.util.AbstractCollection; import java.util.Collection; @@ -82,9 +81,6 @@ private static final Logger LOG = LoggerFactory.getLogger(SlotSharingManager.class); - /** Lock for the internal data structures. */ - private final Object lock = new Object(); - private final SlotSharingGroupId slotSharingGroupId; /** Actions to release allocated slots after a complete multi task slot hierarchy has been released. */ @@ -96,11 +92,9 @@ private final Map<SlotRequestId, TaskSlot> allTaskSlots; /** Root nodes which have not been completed because the allocated slot is still pending. */ - @GuardedBy("lock") private final Map<SlotRequestId, MultiTaskSlot> unresolvedRootSlots; /** Root nodes which have been completed (the underlying allocated slot has been assigned). */ - @GuardedBy("lock") private final Map<TaskManagerLocation, Set<MultiTaskSlot>> resolvedRootSlots; SlotSharingManager( @@ -152,27 +146,23 @@ MultiTaskSlot createRootSlot( allTaskSlots.put(slotRequestId, rootMultiTaskSlot); - synchronized (lock) { - unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot); - } + unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot); // add the root node to the set of resolved root nodes once the SlotContext future has // been completed and we know the slot's TaskManagerLocation slotContextFuture.whenComplete( (SlotContext slotContext, Throwable throwable) -> { if (slotContext != null) { - synchronized (lock) { - final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId); + final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId); - if (resolvedRootNode != null) { - LOG.trace("Fulfill multi task slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId()); + if (resolvedRootNode != null) { + LOG.trace("Fulfill multi task slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId()); - final Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent( - slotContext.getTaskManagerLocation(), - taskManagerLocation -> new HashSet<>(4)); + final Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent( + slotContext.getTaskManagerLocation(), + taskManagerLocation -> new HashSet<>(4)); - innerCollection.add(resolvedRootNode); - } + innerCollection.add(resolvedRootNode); } } else { rootMultiTaskSlot.release(throwable); @@ -193,15 +183,13 @@ MultiTaskSlot createRootSlot( */ @Nullable MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, SchedulingStrategy matcher, SlotProfile slotProfile) { - synchronized (lock) { - Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues = this.resolvedRootSlots.values(); - return matcher.findMatchWithLocality( - slotProfile, - resolvedRootSlotsValues.stream().flatMap(Collection::stream), - (MultiTaskSlot multiTaskSlot) -> multiTaskSlot.getSlotContextFuture().join(), - (MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId), - MultiTaskSlotLocality::of); - } + Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues = this.resolvedRootSlots.values(); + return matcher.findMatchWithLocality( + slotProfile, + resolvedRootSlotsValues.stream().flatMap(Collection::stream), + (MultiTaskSlot multiTaskSlot) -> multiTaskSlot.getSlotContextFuture().join(), + (MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId), + MultiTaskSlotLocality::of); } /** @@ -213,11 +201,9 @@ MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, SchedulingStrategy */ @Nullable MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) { - synchronized (lock) { - for (MultiTaskSlot multiTaskSlot : unresolvedRootSlots.values()) { - if (!multiTaskSlot.contains(groupId)) { - return multiTaskSlot; - } + for (MultiTaskSlot multiTaskSlot : unresolvedRootSlots.values()) { + if (!multiTaskSlot.contains(groupId)) { + return multiTaskSlot; } } @@ -228,11 +214,9 @@ MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) { public String toString() { final StringBuilder builder = new StringBuilder("{\n\tgroupId=").append(slotSharingGroupId).append('\n'); - synchronized (lock) { - builder.append("\tunresolved=").append(unresolvedRootSlots).append('\n'); - builder.append("\tresolved=").append(resolvedRootSlots).append('\n'); - builder.append("\tall=").append(allTaskSlots).append('\n'); - } + builder.append("\tunresolved=").append(unresolvedRootSlots).append('\n'); + builder.append("\tresolved=").append(resolvedRootSlots).append('\n'); + builder.append("\tall=").append(allTaskSlots).append('\n'); return builder.append('}').toString(); } @@ -479,26 +463,20 @@ public void release(Throwable cause) { parent.releaseChild(getGroupId()); } else if (allTaskSlots.remove(getSlotRequestId()) != null) { // we are the root node --> remove the root node from the list of task slots + final MultiTaskSlot unresolvedRootSlot = unresolvedRootSlots.remove(getSlotRequestId()); - if (!slotContextFuture.isDone() || slotContextFuture.isCompletedExceptionally()) { - synchronized (lock) { - // the root node should still be unresolved - unresolvedRootSlots.remove(getSlotRequestId()); - } - } else { + if (unresolvedRootSlot == null) { // the root node should be resolved --> we can access the slot context final SlotContext slotContext = slotContextFuture.getNow(null); if (slotContext != null) { - synchronized (lock) { - final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(slotContext.getTaskManagerLocation()); + final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(slotContext.getTaskManagerLocation()); - if (multiTaskSlots != null) { - multiTaskSlots.remove(this); + if (multiTaskSlots != null) { + multiTaskSlots.remove(this); - if (multiTaskSlots.isEmpty()) { - resolvedRootSlots.remove(slotContext.getTaskManagerLocation()); - } + if (multiTaskSlots.isEmpty()) { + resolvedRootSlots.remove(slotContext.getTaskManagerLocation()); } } } @@ -637,9 +615,7 @@ public String toString() { @VisibleForTesting Collection<MultiTaskSlot> getUnresolvedRootSlots() { - synchronized (lock) { - return unresolvedRootSlots.values(); - } + return unresolvedRootSlots.values(); } /** @@ -649,19 +625,15 @@ public String toString() { @Override public Iterator<MultiTaskSlot> iterator() { - synchronized (lock) { - return new ResolvedRootSlotIterator(resolvedRootSlots.values().iterator()); - } + return new ResolvedRootSlotIterator(resolvedRootSlots.values().iterator()); } @Override public int size() { int numberResolvedMultiTaskSlots = 0; - synchronized (lock) { - for (Set<MultiTaskSlot> multiTaskSlots : resolvedRootSlots.values()) { - numberResolvedMultiTaskSlots += multiTaskSlots.size(); - } + for (Set<MultiTaskSlot> multiTaskSlots : resolvedRootSlots.values()) { + numberResolvedMultiTaskSlots += multiTaskSlots.size(); } return numberResolvedMultiTaskSlots; ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services