[ 
https://issues.apache.org/jira/browse/FLINK-5810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15928748#comment-15928748
 ] 

ASF GitHub Bot commented on FLINK-5810:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3394#discussion_r106512992
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
    @@ -21,519 +21,897 @@
     import org.apache.flink.annotation.VisibleForTesting;
     import org.apache.flink.api.common.time.Time;
     import org.apache.flink.runtime.clusterframework.types.AllocationID;
    -import org.apache.flink.runtime.clusterframework.types.ResourceID;
    -import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
    +import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
    +import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
     import org.apache.flink.runtime.clusterframework.types.SlotID;
     import org.apache.flink.runtime.concurrent.BiFunction;
    +import org.apache.flink.runtime.concurrent.CompletableFuture;
     import org.apache.flink.runtime.concurrent.Future;
    -import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
    +import org.apache.flink.runtime.concurrent.ScheduledExecutor;
    +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.Acknowledge;
     import org.apache.flink.runtime.resourcemanager.SlotRequest;
    -import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered;
    -import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
    -import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
    -import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
    +import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
    +import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
     import org.apache.flink.runtime.taskexecutor.SlotReport;
     import org.apache.flink.runtime.taskexecutor.SlotStatus;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
    +import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
    +import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
     import org.apache.flink.util.Preconditions;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
     import java.util.LinkedHashMap;
     import java.util.Map;
    -
    -import static org.apache.flink.util.Preconditions.checkNotNull;
    +import java.util.Objects;
    +import java.util.UUID;
    +import java.util.concurrent.CancellationException;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.ScheduledFuture;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
     
     /**
    - * SlotManager is responsible for receiving slot requests and do slot 
allocations. It allows to request
    - * slots from registered TaskManagers and issues container allocation 
requests in case of there are not
    - * enough available slots. Besides, it should sync its slot allocation 
with TaskManager's heartbeat.
    - * <p>
    - * The main operation principle of SlotManager is:
    - * <ul>
    - * <li>1. All slot allocation status should be synced with TaskManager, 
which is the ground truth.</li>
    - * <li>2. All slots that have registered must be tracked, either by free 
pool or allocated pool.</li>
    - * <li>3. All slot requests will be handled by best efforts, there is no 
guarantee that one request will be
    - * fulfilled in time or correctly allocated. Conflicts or timeout or some 
special error will happen, it should
    - * be handled outside SlotManager. SlotManager will make each decision 
based on the information it currently
    - * holds.</li>
    - * </ul>
    - * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
    + * The slot manager is responsible for maintaining a view on all 
registered task manager slots,
    + * their allocation and all pending slot requests. Whenever a new slot is 
registered or and
    + * allocated slot is freed, then it tries to fulfill another pending slot 
request. Whenever there
    + * are not enough slots available the slot manager will notify the 
resource manager about it via
    + * {@link ResourceManagerActions#allocateResource(ResourceProfile)}.
    + *
    + * In order to free resources and avoid resource leaks, idling task 
managers (task managers whose
    + * slots are currently not used) and not fulfilled pending slot requests 
time out triggering their
    + * release and failure, respectively.
      */
    -public abstract class SlotManager {
    +public class SlotManager implements AutoCloseable {
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(SlotManager.class);
    +
    +   /** Scheduled executor for timeouts */
    +   private final ScheduledExecutor scheduledExecutor;
    +
    +   /** Timeout for slot requests to the task manager */
    +   private final Time taskManagerRequestTimeout;
    +
    +   /** Timeout after which an allocation is discarded */
    +   private final Time slotRequestTimeout;
    +
    +   /** Timeout after which an unused TaskManager is released */
    +   private final Time taskManagerTimeout;
    +
    +   /** Map for all registered slots */
    +   private final HashMap<SlotID, TaskManagerSlot> slots;
     
    -   protected final Logger LOG = LoggerFactory.getLogger(getClass());
    +   /** Index of all currently free slots */
    +   private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;
     
    -   /** The Resource allocation provider */
    -   protected final ResourceManagerServices rmServices;
    +   /** All currently registered task managers */
    +   private final HashMap<InstanceID, TaskManagerRegistration> 
taskManagerRegistrations;
     
    -   /** All registered task managers with ResourceID and gateway. */
    -   private final Map<ResourceID, TaskExecutorRegistration> taskManagers;
    +   /** Map of fulfilled and active allocations for request deduplication 
purposes */
    +   private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;
     
    -   /** All registered slots, including free and allocated slots */
    -   private final Map<ResourceID, Map<SlotID, ResourceSlot>> 
registeredSlots;
    +   /** Map of pending/unfulfilled slot allocation requests */
    +   private final HashMap<AllocationID, PendingSlotRequest> 
pendingSlotRequests;
     
    -   /** All pending slot requests, waiting available slots to fulfil */
    -   private final Map<AllocationID, SlotRequest> pendingSlotRequests;
    +   /** Leader id of the containing component */
    +   private UUID leaderId;
     
    -   /** All free slots that can be used to be allocated */
    -   private final Map<SlotID, ResourceSlot> freeSlots;
    +   /** Executor for future callbacks which have to be "synchronized" */
    +   private Executor mainThreadExecutor;
     
    -   /** All allocations, we can lookup allocations either by SlotID or 
AllocationID */
    -   private final AllocationMap allocationMap;
    +   /** Callbacks for resource (de-)allocations */
    +   private ResourceManagerActions resourceManagerActions;
     
    -   private final Time timeout;
    +   /** True iff the component has been started */
    +   private boolean started;
     
    -   public SlotManager(ResourceManagerServices rmServices) {
    -           this.rmServices = checkNotNull(rmServices);
    -           this.registeredSlots = new HashMap<>(16);
    -           this.pendingSlotRequests = new LinkedHashMap<>(16);
    -           this.freeSlots = new HashMap<>(16);
    -           this.allocationMap = new AllocationMap();
    -           this.taskManagers = new HashMap<>();
    -           this.timeout = Time.seconds(10);
    +   public SlotManager(
    +                   ScheduledExecutor scheduledExecutor,
    +                   Time taskManagerRequestTimeout,
    +                   Time slotRequestTimeout,
    +                   Time taskManagerTimeout) {
    +           this.scheduledExecutor = 
Preconditions.checkNotNull(scheduledExecutor);
    +           this.taskManagerRequestTimeout = 
Preconditions.checkNotNull(taskManagerRequestTimeout);
    +           this.slotRequestTimeout = 
Preconditions.checkNotNull(slotRequestTimeout);
    +           this.taskManagerTimeout = 
Preconditions.checkNotNull(taskManagerTimeout);
    +
    +           slots = new HashMap<>(16);
    +           freeSlots = new LinkedHashMap<>(16);
    +           taskManagerRegistrations = new HashMap<>(4);
    +           fulfilledSlotRequests = new HashMap<>(16);
    +           pendingSlotRequests = new HashMap<>(16);
    +
    +           leaderId = null;
    +           resourceManagerActions = null;
    +           started = false;
        }
     
    -   // 
------------------------------------------------------------------------
    -   //  slot managements
    -   // 
------------------------------------------------------------------------
    +   // 
---------------------------------------------------------------------------------------------
    +   // Component lifecycle methods
    +   // 
---------------------------------------------------------------------------------------------
     
        /**
    -    * Request a slot with requirements, we may either fulfill the request 
or pending it. Trigger container
    -    * allocation if we don't have enough resource. If we have free slot 
which can match the request, record
    -    * this allocation and forward the request to TaskManager through 
ResourceManager (we want this done by
    -    * RPC's main thread to avoid race condition).
    +    * Starts the slot manager with the given leader id and resource 
manager actions.
         *
    -    * @param request The detailed request of the slot
    -    * @return RMSlotRequestRegistered The confirmation message to be send 
to the caller
    +    * @param newLeaderId to use for communication with the task managers
    +    * @param newResourceManagerActions to use for resource (de-)allocations
    +    */
    +   public void start(UUID newLeaderId, Executor newMainThreadExecutor, 
ResourceManagerActions newResourceManagerActions) {
    +           leaderId = Preconditions.checkNotNull(newLeaderId);
    +           mainThreadExecutor = 
Preconditions.checkNotNull(newMainThreadExecutor);
    +           resourceManagerActions = 
Preconditions.checkNotNull(newResourceManagerActions);
    +
    +           started = true;
    +   }
    +
    +   /**
    +    * Suspends the component. This clears the internal state of the slot 
manager.
         */
    -   public RMSlotRequestRegistered requestSlot(final SlotRequest request) {
    -           final AllocationID allocationId = request.getAllocationId();
    -           if (isRequestDuplicated(request)) {
    -                   LOG.warn("Duplicated slot request, AllocationID:{}", 
allocationId);
    -                   return new RMSlotRequestRegistered(allocationId);
    +   public void suspend() {
    +           for (PendingSlotRequest pendingSlotRequest : 
pendingSlotRequests.values()) {
    +                   cancelPendingSlotRequest(pendingSlotRequest);
                }
     
    -           // try to fulfil the request with current free slots
    -           final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
    -           if (slot != null) {
    -                   LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
    -                           allocationId, request.getJobId());
    +           pendingSlotRequests.clear();
    +
    +           HashSet<InstanceID> registeredTaskManagers = new 
HashSet<>(taskManagerRegistrations.keySet());
    +
    +           for (InstanceID registeredTaskManager : registeredTaskManagers) 
{
    +                   unregisterTaskManager(registeredTaskManager);
    +           }
    +
    +           leaderId = null;
    +           resourceManagerActions = null;
    +           started = false;
    +   }
    +
    +   /**
    +    * Closes the slot manager.
    +    *
    +    * @throws Exception if the close operation fails
    +    */
    +   @Override
    +   public void close() throws Exception {
    +           suspend();
    +   }
    +
    +   // 
---------------------------------------------------------------------------------------------
    +   // Public API
    +   // 
---------------------------------------------------------------------------------------------
    +
    +   /**
    +    * Requests a slot with the respective resource profile.
    +    *
    +    * @param slotRequest specifying the requested slot specs
    +    * @return true if the slot request was registered; false if the 
request is a duplicate
    +    * @throws SlotManagerException if the slot request failed (e.g. not 
enough resources left)
    +    */
    +   public boolean registerSlotRequest(SlotRequest slotRequest) throws 
SlotManagerException {
    +           checkInit();
     
    -                   // record this allocation in bookkeeping
    -                   allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
    -                   // remove selected slot from free pool
    -                   freeSlots.remove(slot.getSlotId());
    +           if (checkDuplicateRequest(slotRequest.getAllocationId())) {
    +                   LOG.debug("Ignoring a duplicate slot request with 
allocation id {}.", slotRequest.getAllocationId());
     
    -                   sendSlotRequest(slot, request);
    +                   return false;
                } else {
    -                   LOG.info("Cannot fulfil slot request, try to allocate a 
new container for it, " +
    -                           "AllocationID:{}, JobID:{}", allocationId, 
request.getJobId());
    -                   Preconditions.checkState(rmServices != null,
    -                           "Attempted to allocate resources but no 
ResourceManagerServices set.");
    -                   
rmServices.allocateResource(request.getResourceProfile());
    -                   pendingSlotRequests.put(allocationId, request);
    +                   PendingSlotRequest pendingSlotRequest = new 
PendingSlotRequest(slotRequest);
    +
    +                   pendingSlotRequests.put(slotRequest.getAllocationId(), 
pendingSlotRequest);
    +
    +                   try {
    +                           internalRequestSlot(pendingSlotRequest);
    +                   } catch (ResourceManagerException e) {
    +                           // requesting the slot failed --> remove 
pending slot request
    +                           
pendingSlotRequests.remove(slotRequest.getAllocationId());
    +
    +                           throw new SlotManagerException("Could not 
fulfill slot request " + slotRequest.getAllocationId() + '.', e);
    +                   }
    +
    +                   return true;
                }
    +   }
     
    -           return new RMSlotRequestRegistered(allocationId);
    +   /**
    +    * Cancels and removes a pending slot request with the given allocation 
id. If there is no such
    +    * pending request, then nothing is done.
    +    *
    +    * @param allocationId identifying the pending slot request
    +    * @return True if a pending slot request was found; otherwise false
    +    */
    +   public boolean unregisterSlotRequest(AllocationID allocationId) {
    +           checkInit();
    +
    +           PendingSlotRequest pendingSlotRequest = 
pendingSlotRequests.remove(allocationId);
    +
    +           if (null != pendingSlotRequest) {
    +                   cancelPendingSlotRequest(pendingSlotRequest);
    +
    +                   return true;
    +           } else {
    +                   LOG.debug("No pending slot request with allocation id 
{} found.", allocationId);
    +
    +                   return false;
    +           }
        }
     
        /**
    -    * Notifies the SlotManager that a slot is available again after being 
allocated.
    -    * @param slotID slot id of available slot
    +    * Registers a new task manager at the slot manager. This will make the 
task managers slots
    +    * known and, thus, available for allocation.
    +    *
    +    * @param taskExecutorConnection for the new task manager
    +    * @param initialSlotReport for the new task manager
         */
    -   public void notifySlotAvailable(ResourceID resourceID, SlotID slotID) {
    -           if (!allocationMap.isAllocated(slotID)) {
    -                   throw new IllegalStateException("Slot was not 
previously allocated but " +
    -                           "TaskManager reports it as available again");
    +   public void registerTaskManager(final TaskExecutorConnection 
taskExecutorConnection, SlotReport initialSlotReport) {
    +           checkInit();
    +
    +           // we identify task managers by their instance id
    +           if 
(!taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) 
{
    +                   TaskManagerRegistration taskManagerRegistration = new 
TaskManagerRegistration(taskExecutorConnection);
    +                   
taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), 
taskManagerRegistration);
                }
    -           allocationMap.removeAllocation(slotID);
    -           final Map<SlotID, ResourceSlot> slots = 
registeredSlots.get(resourceID);
    -           ResourceSlot freeSlot = slots.get(slotID);
    -           if (freeSlot == null) {
    -                   throw new IllegalStateException("Slot was not 
registered with SlotManager but " +
    -                           "TaskManager reported it to be available.");
    +
    +           reportSlotStatus(taskExecutorConnection.getInstanceID(), 
initialSlotReport);
    +   }
    +
    +   /**
    +    * Unregisters the task manager identified by the given instance id and 
its associated slots
    +    * from the slot manager.
    +    *
    +    * @param instanceId identifying the task manager to unregister
    +    * @return True if there existed a registered task manager with the 
given instance id
    +    */
    +   public boolean unregisterTaskManager(InstanceID instanceId) {
    +           checkInit();
    +
    +           TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.remove(instanceId);
    +
    +           if (null != taskManagerRegistration) {
    +                   removeSlots(taskManagerRegistration.getSlots());
    +
    +                   taskManagerRegistration.cancelTimeout();
    +
    +                   return true;
    +           } else {
    +                   LOG.debug("There is no task manager registered with 
instance ID {}. Ignoring this message.", instanceId);
    +
    +                   return false;
                }
    -           handleFreeSlot(freeSlot);
        }
     
        /**
    -    * The slot request to TaskManager may be either failed by rpc 
communication (timeout, network error, etc.)
    -    * or really rejected by TaskManager. We shall retry this request by:
    -    * <ul>
    -    * <li>1. verify and clear all the previous allocate information for 
this request
    -    * <li>2. try to request slot again
    -    * </ul>
    -    * <p>
    -    * This may cause some duplicate allocation, e.g. the slot request to 
TaskManager is successful but the response
    -    * is lost somehow, so we may request a slot in another TaskManager, 
this causes two slots assigned to one request,
    -    * but it can be taken care of by rejecting registration at JobManager.
    +    * Reports the current slot allocations for a task manager identified 
by the given instance id.
         *
    -    * @param originalRequest The original slot request
    -    * @param slotId          The target SlotID
    +    * @param instanceId identifying the task manager for which to report 
the slot status
    +    * @param slotReport containing the status for all of its slots
    +    * @return true if the slot status has been updated successfully, 
otherwise false
         */
    -   void handleSlotRequestFailedAtTaskManager(final SlotRequest 
originalRequest, final SlotID slotId) {
    -           final AllocationID originalAllocationId = 
originalRequest.getAllocationId();
    -           LOG.info("Slot request failed at TaskManager, SlotID:{}, 
AllocationID:{}, JobID:{}",
    -                   slotId, originalAllocationId, 
originalRequest.getJobId());
    -
    -           if (allocationMap.isAllocated(slotId)) {
    -                   final AllocationID expectedAllocationId = 
allocationMap.getAllocationID(slotId);
    -
    -                   // check whether we have an agreement on whom this slot 
belongs to
    -                   if (originalAllocationId.equals(expectedAllocationId)) {
    -                           LOG.info("De-allocate this request and retry");
    -                           
allocationMap.removeAllocation(expectedAllocationId);
    -                           
pendingSlotRequests.put(originalRequest.getAllocationId(), originalRequest);
    -                           ResourceSlot slot = 
checkNotNull(getRegisteredSlot(slotId));
    -                           // treat this slot as empty and retry with a 
different request
    -                           handleFreeSlot(slot);
    +   public boolean reportSlotStatus(InstanceID instanceId, SlotReport 
slotReport) {
    +           checkInit();
    +
    +           TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.get(instanceId);
    +
    +           if (null != taskManagerRegistration) {
    +                   HashSet<SlotID> slotsToRemove = new 
HashSet<>(taskManagerRegistration.getSlots());
    +                   boolean idle = true;
    +
    +                   for (SlotStatus slotStatus : slotReport) {
    +                           if 
(slotsToRemove.remove(slotStatus.getSlotID())) {
    +                                   // slot which was already registered
    +                                   updateSlot(slotStatus.getSlotID(), 
slotStatus.getAllocationID());
    +                           } else {
    +                                   // new slot
    +                                   registerSlot(
    +                                           instanceId,
    +                                           slotStatus.getSlotID(),
    +                                           slotStatus.getAllocationID(),
    +                                           slotStatus.getResourceProfile(),
    +                                           
taskManagerRegistration.getTaskManagerConnection());
    +                           }
    +
    +                           TaskManagerSlot slot = 
slots.get(slotStatus.getSlotID());
    +
    +                           idle &= slot.isFree();
    +                   }
    +
    +                   // remove the slots for which we haven't received a 
slot status message
    +                   removeSlots(slotsToRemove);
    +
    +                   if (idle) {
    --- End diff --
    
    I assume this breaks the TaskManager timeouts. Every time a heartbeat comes 
(with a slot report), the TaskManager is detected to be idle, and the timeout 
is scheduled, overriding the previous timeout (pushing the timeout further 
back).


> Harden SlotManager
> ------------------
>
>                 Key: FLINK-5810
>                 URL: https://issues.apache.org/jira/browse/FLINK-5810
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination
>    Affects Versions: 1.3.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>
> Harden the {{SlotManager}} logic to better cope with lost messages.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to