tillrohrmann commented on a change in pull request #13964:
URL: https://github.com/apache/flink/pull/13964#discussion_r540222605



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
##########
@@ -0,0 +1,680 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.slots.ResourceRequirements;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.clock.Clock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * {@link SlotPool} implementation which uses the {@link DeclarativeSlotPool} 
to allocate slots.
+ */
+public class DeclarativeSlotPoolBridge implements SlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DeclarativeSlotPoolBridge.class);
+
+       private final JobID jobId;
+
+       private final Map<SlotRequestId, PendingRequest> pendingRequests;
+       private final Map<SlotRequestId, AllocationID> fulfilledRequests;
+       private final Set<AllocationID> newSlotsSet;
+       private final DeclarativeSlotPool declarativeSlotPool;
+       private final Set<ResourceID> registeredTaskManagers;
+
+       @Nullable
+       private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+       @Nullable
+       private String jobManagerAddress;
+
+       @Nullable
+       private JobMasterId jobMasterId;
+
+       private DeclareResourceRequirementServiceConnectionManager 
declareResourceRequirementServiceConnectionManager;
+
+       private final Clock clock;
+       private final Time rpcTimeout;
+       private final Time idleSlotTimeout;
+       private final Time batchSlotTimeout;
+       private boolean isBatchSlotRequestTimeoutCheckDisabled;
+       private final Duration resourceAcquisitionTimeout;
+
+       public DeclarativeSlotPoolBridge(
+                       JobID jobId,
+                       DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+                       Clock clock,
+                       Time rpcTimeout,
+                       Time idleSlotTimeout,
+                       Time batchSlotTimeout, Duration 
resourceAcquisitionTimeout) {
+               this.jobId = Preconditions.checkNotNull(jobId);
+               this.clock = Preconditions.checkNotNull(clock);
+               this.rpcTimeout = Preconditions.checkNotNull(rpcTimeout);
+               this.idleSlotTimeout = 
Preconditions.checkNotNull(idleSlotTimeout);
+               this.batchSlotTimeout = 
Preconditions.checkNotNull(batchSlotTimeout);
+               this.isBatchSlotRequestTimeoutCheckDisabled = false;
+               this.resourceAcquisitionTimeout = 
Preconditions.checkNotNull(resourceAcquisitionTimeout);
+
+               this.pendingRequests = new LinkedHashMap<>();
+               this.fulfilledRequests = new HashMap<>();
+               this.newSlotsSet = new HashSet<>();
+               this.registeredTaskManagers = new HashSet<>();
+               this.declareResourceRequirementServiceConnectionManager = 
NoOpDeclareResourceRequirementServiceConnectionManager.INSTANCE;
+               this.declarativeSlotPool = declarativeSlotPoolFactory.create(
+                               this::declareResourceRequirements,
+                               this::newSlotsAreAvailable,
+                               idleSlotTimeout,
+                               rpcTimeout);
+       }
+
+       @Override
+       public void start(JobMasterId jobMasterId, String newJobManagerAddress, 
ComponentMainThreadExecutor jmMainThreadScheduledExecutor) throws Exception {
+               this.componentMainThreadExecutor = 
Preconditions.checkNotNull(jmMainThreadScheduledExecutor);
+               this.jobManagerAddress = 
Preconditions.checkNotNull(newJobManagerAddress);
+               this.jobMasterId = Preconditions.checkNotNull(jobMasterId);
+               this.declareResourceRequirementServiceConnectionManager = 
DefaultDeclareResourceRequirementServiceConnectionManager.create(componentMainThreadExecutor);
+
+               
componentMainThreadExecutor.schedule(this::checkIdleSlotTimeout, 
idleSlotTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+               
componentMainThreadExecutor.schedule(this::checkBatchSlotTimeout, 
batchSlotTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+       }
+
+       @Override
+       public void suspend() {
+               assertRunningInMainThread();
+               LOG.info("Suspending slot pool.");
+
+               cancelPendingRequests(new FlinkException("Suspending slot 
pool."), request -> true);
+               clearState();
+       }
+
+       @Override
+       public void close() {
+               LOG.info("Closing slot pool.");
+               final FlinkException cause = new FlinkException("Closing slot 
pool");
+               cancelPendingRequests(cause, request -> true);
+               releaseAllTaskManagers(new FlinkException("Closing slot 
pool."));
+               clearState();
+       }
+
+       private void cancelPendingRequests(FlinkException cancelCause, 
Predicate<PendingRequest> requestPredicate) {
+               ResourceCounter decreasedResourceRequirements = 
ResourceCounter.empty();
+
+               // need a copy since failing a request could trigger another 
request to be issued
+               final Iterable<PendingRequest> pendingRequestsToFail = new 
ArrayList<>(pendingRequests.values());
+               pendingRequests.clear();
+
+               for (PendingRequest pendingRequest : pendingRequestsToFail) {
+                       if (requestPredicate.test(pendingRequest)) {
+                               pendingRequest.failRequest(cancelCause);
+                               decreasedResourceRequirements = 
decreasedResourceRequirements.add(pendingRequest.getResourceProfile(), 1);
+                       } else {
+                               
pendingRequests.put(pendingRequest.slotRequestId, pendingRequest);
+                       }
+               }
+
+               if (!decreasedResourceRequirements.isEmpty()) {
+                       
declarativeSlotPool.decreaseResourceRequirementsBy(decreasedResourceRequirements);
+               }
+       }
+
+       private void clearState() {
+               declareResourceRequirementServiceConnectionManager.close();
+               declareResourceRequirementServiceConnectionManager = 
NoOpDeclareResourceRequirementServiceConnectionManager.INSTANCE;
+               registeredTaskManagers.clear();
+               jobManagerAddress = null;
+               jobMasterId = null;
+       }
+
+       @Override
+       public void connectToResourceManager(ResourceManagerGateway 
resourceManagerGateway) {
+               assertRunningInMainThread();
+               Preconditions.checkNotNull(resourceManagerGateway);
+
+               
declareResourceRequirementServiceConnectionManager.connect(resourceRequirements 
-> resourceManagerGateway.declareRequiredResources(jobMasterId, 
resourceRequirements, rpcTimeout));
+               
declareResourceRequirements(declarativeSlotPool.getResourceRequirements());
+       }
+
+       @Override
+       public void disconnectResourceManager() {
+               assertRunningInMainThread();
+               
this.declareResourceRequirementServiceConnectionManager.disconnect();
+       }
+
+       @Override
+       public boolean registerTaskManager(ResourceID resourceID) {
+               assertRunningInMainThread();
+
+               LOG.debug("Register new TaskExecutor {}.", resourceID);
+               return registeredTaskManagers.add(resourceID);
+       }
+
+       @Override
+       public boolean releaseTaskManager(ResourceID resourceId, Exception 
cause) {
+               assertRunningInMainThread();
+
+               if (registeredTaskManagers.remove(resourceId)) {
+                       internalReleaseTaskManager(resourceId, cause);
+                       return true;
+               } else {
+                       return false;
+               }
+       }
+
+       private void releaseAllTaskManagers(FlinkException cause) {
+               for (ResourceID registeredTaskManager : registeredTaskManagers) 
{
+                       internalReleaseTaskManager(registeredTaskManager, 
cause);
+               }
+
+               registeredTaskManagers.clear();
+       }
+
+       private void internalReleaseTaskManager(ResourceID resourceId, 
Exception cause) {
+               ResourceCounter previouslyFulfilledRequirement = 
declarativeSlotPool.releaseSlots(resourceId, cause);
+               
declarativeSlotPool.decreaseResourceRequirementsBy(previouslyFulfilledRequirement);
+       }
+
+       @Override
+       public Collection<SlotOffer> offerSlots(TaskManagerLocation 
taskManagerLocation, TaskManagerGateway taskManagerGateway, 
Collection<SlotOffer> offers) {
+               assertRunningInMainThread();
+               Preconditions.checkNotNull(taskManagerGateway);
+               Preconditions.checkNotNull(offers);
+
+               if 
(!registeredTaskManagers.contains(taskManagerLocation.getResourceID())) {
+                       return Collections.emptyList();
+               }
+
+               return declarativeSlotPool.offerSlots(offers, 
taskManagerLocation, taskManagerGateway, clock.relativeTimeMillis());
+       }
+
+       @VisibleForTesting
+       void newSlotsAreAvailable(Collection<? extends PhysicalSlot> newSlots) {
+               // Avoid notifying new slots multiple times due to 
SchedulerImpl allocating and releasing slots
+               // in order to find the best shared slot
+               final Collection<PhysicalSlot> slotsToProcess = new 
ArrayList<>();
+               for (PhysicalSlot newSlot : newSlots) {
+                       if (newSlotsSet.add(newSlot.getAllocationId())) {
+                               slotsToProcess.add(newSlot);
+                       }
+               }
+
+               final Collection<PendingRequestSlotMatching> matchingsToFulfill 
= new ArrayList<>();
+
+               for (PhysicalSlot newSlot : slotsToProcess) {
+                       final Optional<PendingRequest> matchingPendingRequest = 
findMatchingPendingRequest(newSlot);
+
+                       matchingPendingRequest.ifPresent(pendingRequest -> {
+                               
Preconditions.checkNotNull(pendingRequests.remove(pendingRequest.getSlotRequestId()),
 "Cannot fulfill a non existing pending slot request.");
+                               
reserveFreeSlot(pendingRequest.getSlotRequestId(), newSlot.getAllocationId(), 
pendingRequest.resourceProfile);
+
+                               
matchingsToFulfill.add(PendingRequestSlotMatching.createFor(pendingRequest, 
newSlot));
+                       });
+
+                       newSlotsSet.remove(newSlot.getAllocationId());

Review comment:
       I think I could reproduce the problem using the `TimeStampITCase` with 
the commit before tillrohrmann@2c02f54. The problem was that we were reusing 
the same `MultiTaskSlot` for the same `JobVertexID` twice. I think the problem 
is the following: When scheduling a subtask `a`, we allocate a `MultiTaskSlot` 
in `SchedulerImpl.allocateMultiTaskSlot` by picking a `MultiTaskSlot` which 
does not contain yet the `JobVertexID`. Once we have it, we try to allocate an 
available slot to see whether it has a better locality. If this is not the 
case, then we release the slot. This will trigger the `newSlotsAreAvailable` 
method again. If now the last input for a subtask `b` of the same `JobVertexID` 
is pending, the released slot could be used to fulfill the request. This would 
then trigger the scheduling of `b` which could take the same `MultiTaskSlot` 
which `a` had picked but not yet reserved. The problem is that due to the 
direct executions of the future, we can have an interleaved executio
 n where multiple calls to `SchedulerImpl.allocateMultiTaskSlot` can happen 
nestedly:
   
   ```
   start allocateMultiTaskSlot(a)
   start allocateMultiTaskSlot(b)
   start allocateMultiTaskSlot(c)
   end allocateMultiTaskSlot(c)
   end allocateMultiTaskSlot(b)
   end allocateMultiTaskSlot(a)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to