[ 
https://issues.apache.org/jira/browse/FLINK-5810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15928750#comment-15928750
 ] 

ASF GitHub Bot commented on FLINK-5810:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3394#discussion_r106507099
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
    @@ -21,519 +21,897 @@
     import org.apache.flink.annotation.VisibleForTesting;
     import org.apache.flink.api.common.time.Time;
     import org.apache.flink.runtime.clusterframework.types.AllocationID;
    -import org.apache.flink.runtime.clusterframework.types.ResourceID;
    -import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
    +import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
    +import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
     import org.apache.flink.runtime.clusterframework.types.SlotID;
     import org.apache.flink.runtime.concurrent.BiFunction;
    +import org.apache.flink.runtime.concurrent.CompletableFuture;
     import org.apache.flink.runtime.concurrent.Future;
    -import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
    +import org.apache.flink.runtime.concurrent.ScheduledExecutor;
    +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.Acknowledge;
     import org.apache.flink.runtime.resourcemanager.SlotRequest;
    -import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered;
    -import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
    -import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
    -import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
    +import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
    +import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
     import org.apache.flink.runtime.taskexecutor.SlotReport;
     import org.apache.flink.runtime.taskexecutor.SlotStatus;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
    +import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
    +import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
     import org.apache.flink.util.Preconditions;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
     import java.util.LinkedHashMap;
     import java.util.Map;
    -
    -import static org.apache.flink.util.Preconditions.checkNotNull;
    +import java.util.Objects;
    +import java.util.UUID;
    +import java.util.concurrent.CancellationException;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.ScheduledFuture;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
     
     /**
    - * SlotManager is responsible for receiving slot requests and do slot 
allocations. It allows to request
    - * slots from registered TaskManagers and issues container allocation 
requests in case of there are not
    - * enough available slots. Besides, it should sync its slot allocation 
with TaskManager's heartbeat.
    - * <p>
    - * The main operation principle of SlotManager is:
    - * <ul>
    - * <li>1. All slot allocation status should be synced with TaskManager, 
which is the ground truth.</li>
    - * <li>2. All slots that have registered must be tracked, either by free 
pool or allocated pool.</li>
    - * <li>3. All slot requests will be handled by best efforts, there is no 
guarantee that one request will be
    - * fulfilled in time or correctly allocated. Conflicts or timeout or some 
special error will happen, it should
    - * be handled outside SlotManager. SlotManager will make each decision 
based on the information it currently
    - * holds.</li>
    - * </ul>
    - * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
    + * The slot manager is responsible for maintaining a view on all 
registered task manager slots,
    + * their allocation and all pending slot requests. Whenever a new slot is 
registered or and
    + * allocated slot is freed, then it tries to fulfill another pending slot 
request. Whenever there
    + * are not enough slots available the slot manager will notify the 
resource manager about it via
    + * {@link ResourceManagerActions#allocateResource(ResourceProfile)}.
    + *
    + * In order to free resources and avoid resource leaks, idling task 
managers (task managers whose
    + * slots are currently not used) and not fulfilled pending slot requests 
time out triggering their
    + * release and failure, respectively.
      */
    -public abstract class SlotManager {
    +public class SlotManager implements AutoCloseable {
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(SlotManager.class);
    +
    +   /** Scheduled executor for timeouts */
    +   private final ScheduledExecutor scheduledExecutor;
    +
    +   /** Timeout for slot requests to the task manager */
    +   private final Time taskManagerRequestTimeout;
    +
    +   /** Timeout after which an allocation is discarded */
    +   private final Time slotRequestTimeout;
    +
    +   /** Timeout after which an unused TaskManager is released */
    +   private final Time taskManagerTimeout;
    +
    +   /** Map for all registered slots */
    +   private final HashMap<SlotID, TaskManagerSlot> slots;
     
    -   protected final Logger LOG = LoggerFactory.getLogger(getClass());
    +   /** Index of all currently free slots */
    +   private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;
     
    -   /** The Resource allocation provider */
    -   protected final ResourceManagerServices rmServices;
    +   /** All currently registered task managers */
    +   private final HashMap<InstanceID, TaskManagerRegistration> 
taskManagerRegistrations;
     
    -   /** All registered task managers with ResourceID and gateway. */
    -   private final Map<ResourceID, TaskExecutorRegistration> taskManagers;
    +   /** Map of fulfilled and active allocations for request deduplication 
purposes */
    +   private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;
     
    -   /** All registered slots, including free and allocated slots */
    -   private final Map<ResourceID, Map<SlotID, ResourceSlot>> 
registeredSlots;
    +   /** Map of pending/unfulfilled slot allocation requests */
    +   private final HashMap<AllocationID, PendingSlotRequest> 
pendingSlotRequests;
     
    -   /** All pending slot requests, waiting available slots to fulfil */
    -   private final Map<AllocationID, SlotRequest> pendingSlotRequests;
    +   /** Leader id of the containing component */
    +   private UUID leaderId;
     
    -   /** All free slots that can be used to be allocated */
    -   private final Map<SlotID, ResourceSlot> freeSlots;
    +   /** Executor for future callbacks which have to be "synchronized" */
    +   private Executor mainThreadExecutor;
     
    -   /** All allocations, we can lookup allocations either by SlotID or 
AllocationID */
    -   private final AllocationMap allocationMap;
    +   /** Callbacks for resource (de-)allocations */
    +   private ResourceManagerActions resourceManagerActions;
     
    -   private final Time timeout;
    +   /** True iff the component has been started */
    +   private boolean started;
     
    -   public SlotManager(ResourceManagerServices rmServices) {
    -           this.rmServices = checkNotNull(rmServices);
    -           this.registeredSlots = new HashMap<>(16);
    -           this.pendingSlotRequests = new LinkedHashMap<>(16);
    -           this.freeSlots = new HashMap<>(16);
    -           this.allocationMap = new AllocationMap();
    -           this.taskManagers = new HashMap<>();
    -           this.timeout = Time.seconds(10);
    +   public SlotManager(
    +                   ScheduledExecutor scheduledExecutor,
    +                   Time taskManagerRequestTimeout,
    +                   Time slotRequestTimeout,
    +                   Time taskManagerTimeout) {
    +           this.scheduledExecutor = 
Preconditions.checkNotNull(scheduledExecutor);
    +           this.taskManagerRequestTimeout = 
Preconditions.checkNotNull(taskManagerRequestTimeout);
    +           this.slotRequestTimeout = 
Preconditions.checkNotNull(slotRequestTimeout);
    +           this.taskManagerTimeout = 
Preconditions.checkNotNull(taskManagerTimeout);
    +
    +           slots = new HashMap<>(16);
    +           freeSlots = new LinkedHashMap<>(16);
    +           taskManagerRegistrations = new HashMap<>(4);
    +           fulfilledSlotRequests = new HashMap<>(16);
    +           pendingSlotRequests = new HashMap<>(16);
    +
    +           leaderId = null;
    +           resourceManagerActions = null;
    +           started = false;
        }
     
    -   // 
------------------------------------------------------------------------
    -   //  slot managements
    -   // 
------------------------------------------------------------------------
    +   // 
---------------------------------------------------------------------------------------------
    +   // Component lifecycle methods
    +   // 
---------------------------------------------------------------------------------------------
     
        /**
    -    * Request a slot with requirements, we may either fulfill the request 
or pending it. Trigger container
    -    * allocation if we don't have enough resource. If we have free slot 
which can match the request, record
    -    * this allocation and forward the request to TaskManager through 
ResourceManager (we want this done by
    -    * RPC's main thread to avoid race condition).
    +    * Starts the slot manager with the given leader id and resource 
manager actions.
         *
    -    * @param request The detailed request of the slot
    -    * @return RMSlotRequestRegistered The confirmation message to be send 
to the caller
    +    * @param newLeaderId to use for communication with the task managers
    +    * @param newResourceManagerActions to use for resource (de-)allocations
    +    */
    +   public void start(UUID newLeaderId, Executor newMainThreadExecutor, 
ResourceManagerActions newResourceManagerActions) {
    +           leaderId = Preconditions.checkNotNull(newLeaderId);
    +           mainThreadExecutor = 
Preconditions.checkNotNull(newMainThreadExecutor);
    +           resourceManagerActions = 
Preconditions.checkNotNull(newResourceManagerActions);
    +
    +           started = true;
    +   }
    +
    +   /**
    +    * Suspends the component. This clears the internal state of the slot 
manager.
         */
    -   public RMSlotRequestRegistered requestSlot(final SlotRequest request) {
    -           final AllocationID allocationId = request.getAllocationId();
    -           if (isRequestDuplicated(request)) {
    -                   LOG.warn("Duplicated slot request, AllocationID:{}", 
allocationId);
    -                   return new RMSlotRequestRegistered(allocationId);
    +   public void suspend() {
    +           for (PendingSlotRequest pendingSlotRequest : 
pendingSlotRequests.values()) {
    +                   cancelPendingSlotRequest(pendingSlotRequest);
                }
     
    -           // try to fulfil the request with current free slots
    -           final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
    -           if (slot != null) {
    -                   LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
    -                           allocationId, request.getJobId());
    +           pendingSlotRequests.clear();
    +
    +           HashSet<InstanceID> registeredTaskManagers = new 
HashSet<>(taskManagerRegistrations.keySet());
    --- End diff --
    
    If the copy of the collection is just for concurrent iteration, I would 
make it an `ArrayList`. Builds faster and iterates faster.


> Harden SlotManager
> ------------------
>
>                 Key: FLINK-5810
>                 URL: https://issues.apache.org/jira/browse/FLINK-5810
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination
>    Affects Versions: 1.3.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>
> Harden the {{SlotManager}} logic to better cope with lost messages.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to