[ https://issues.apache.org/jira/browse/FLINK-5810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15928748#comment-15928748 ]
ASF GitHub Bot commented on FLINK-5810: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3394#discussion_r106512992 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -21,519 +21,897 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.clusterframework.types.ResourceSlot; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot; import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.concurrent.BiFunction; +import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.resourcemanager.ResourceManagerServices; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.SlotRequest; -import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered; -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected; -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply; -import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; +import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; - -import static org.apache.flink.util.Preconditions.checkNotNull; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.CancellationException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** - * SlotManager is responsible for receiving slot requests and do slot allocations. It allows to request - * slots from registered TaskManagers and issues container allocation requests in case of there are not - * enough available slots. Besides, it should sync its slot allocation with TaskManager's heartbeat. - * <p> - * The main operation principle of SlotManager is: - * <ul> - * <li>1. All slot allocation status should be synced with TaskManager, which is the ground truth.</li> - * <li>2. All slots that have registered must be tracked, either by free pool or allocated pool.</li> - * <li>3. All slot requests will be handled by best efforts, there is no guarantee that one request will be - * fulfilled in time or correctly allocated. Conflicts or timeout or some special error will happen, it should - * be handled outside SlotManager. SlotManager will make each decision based on the information it currently - * holds.</li> - * </ul> - * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>. + * The slot manager is responsible for maintaining a view on all registered task manager slots, + * their allocation and all pending slot requests. Whenever a new slot is registered or and + * allocated slot is freed, then it tries to fulfill another pending slot request. Whenever there + * are not enough slots available the slot manager will notify the resource manager about it via + * {@link ResourceManagerActions#allocateResource(ResourceProfile)}. + * + * In order to free resources and avoid resource leaks, idling task managers (task managers whose + * slots are currently not used) and not fulfilled pending slot requests time out triggering their + * release and failure, respectively. */ -public abstract class SlotManager { +public class SlotManager implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class); + + /** Scheduled executor for timeouts */ + private final ScheduledExecutor scheduledExecutor; + + /** Timeout for slot requests to the task manager */ + private final Time taskManagerRequestTimeout; + + /** Timeout after which an allocation is discarded */ + private final Time slotRequestTimeout; + + /** Timeout after which an unused TaskManager is released */ + private final Time taskManagerTimeout; + + /** Map for all registered slots */ + private final HashMap<SlotID, TaskManagerSlot> slots; - protected final Logger LOG = LoggerFactory.getLogger(getClass()); + /** Index of all currently free slots */ + private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots; - /** The Resource allocation provider */ - protected final ResourceManagerServices rmServices; + /** All currently registered task managers */ + private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations; - /** All registered task managers with ResourceID and gateway. */ - private final Map<ResourceID, TaskExecutorRegistration> taskManagers; + /** Map of fulfilled and active allocations for request deduplication purposes */ + private final HashMap<AllocationID, SlotID> fulfilledSlotRequests; - /** All registered slots, including free and allocated slots */ - private final Map<ResourceID, Map<SlotID, ResourceSlot>> registeredSlots; + /** Map of pending/unfulfilled slot allocation requests */ + private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests; - /** All pending slot requests, waiting available slots to fulfil */ - private final Map<AllocationID, SlotRequest> pendingSlotRequests; + /** Leader id of the containing component */ + private UUID leaderId; - /** All free slots that can be used to be allocated */ - private final Map<SlotID, ResourceSlot> freeSlots; + /** Executor for future callbacks which have to be "synchronized" */ + private Executor mainThreadExecutor; - /** All allocations, we can lookup allocations either by SlotID or AllocationID */ - private final AllocationMap allocationMap; + /** Callbacks for resource (de-)allocations */ + private ResourceManagerActions resourceManagerActions; - private final Time timeout; + /** True iff the component has been started */ + private boolean started; - public SlotManager(ResourceManagerServices rmServices) { - this.rmServices = checkNotNull(rmServices); - this.registeredSlots = new HashMap<>(16); - this.pendingSlotRequests = new LinkedHashMap<>(16); - this.freeSlots = new HashMap<>(16); - this.allocationMap = new AllocationMap(); - this.taskManagers = new HashMap<>(); - this.timeout = Time.seconds(10); + public SlotManager( + ScheduledExecutor scheduledExecutor, + Time taskManagerRequestTimeout, + Time slotRequestTimeout, + Time taskManagerTimeout) { + this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor); + this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout); + this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout); + this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout); + + slots = new HashMap<>(16); + freeSlots = new LinkedHashMap<>(16); + taskManagerRegistrations = new HashMap<>(4); + fulfilledSlotRequests = new HashMap<>(16); + pendingSlotRequests = new HashMap<>(16); + + leaderId = null; + resourceManagerActions = null; + started = false; } - // ------------------------------------------------------------------------ - // slot managements - // ------------------------------------------------------------------------ + // --------------------------------------------------------------------------------------------- + // Component lifecycle methods + // --------------------------------------------------------------------------------------------- /** - * Request a slot with requirements, we may either fulfill the request or pending it. Trigger container - * allocation if we don't have enough resource. If we have free slot which can match the request, record - * this allocation and forward the request to TaskManager through ResourceManager (we want this done by - * RPC's main thread to avoid race condition). + * Starts the slot manager with the given leader id and resource manager actions. * - * @param request The detailed request of the slot - * @return RMSlotRequestRegistered The confirmation message to be send to the caller + * @param newLeaderId to use for communication with the task managers + * @param newResourceManagerActions to use for resource (de-)allocations + */ + public void start(UUID newLeaderId, Executor newMainThreadExecutor, ResourceManagerActions newResourceManagerActions) { + leaderId = Preconditions.checkNotNull(newLeaderId); + mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor); + resourceManagerActions = Preconditions.checkNotNull(newResourceManagerActions); + + started = true; + } + + /** + * Suspends the component. This clears the internal state of the slot manager. */ - public RMSlotRequestRegistered requestSlot(final SlotRequest request) { - final AllocationID allocationId = request.getAllocationId(); - if (isRequestDuplicated(request)) { - LOG.warn("Duplicated slot request, AllocationID:{}", allocationId); - return new RMSlotRequestRegistered(allocationId); + public void suspend() { + for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) { + cancelPendingSlotRequest(pendingSlotRequest); } - // try to fulfil the request with current free slots - final ResourceSlot slot = chooseSlotToUse(request, freeSlots); - if (slot != null) { - LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(), - allocationId, request.getJobId()); + pendingSlotRequests.clear(); + + HashSet<InstanceID> registeredTaskManagers = new HashSet<>(taskManagerRegistrations.keySet()); + + for (InstanceID registeredTaskManager : registeredTaskManagers) { + unregisterTaskManager(registeredTaskManager); + } + + leaderId = null; + resourceManagerActions = null; + started = false; + } + + /** + * Closes the slot manager. + * + * @throws Exception if the close operation fails + */ + @Override + public void close() throws Exception { + suspend(); + } + + // --------------------------------------------------------------------------------------------- + // Public API + // --------------------------------------------------------------------------------------------- + + /** + * Requests a slot with the respective resource profile. + * + * @param slotRequest specifying the requested slot specs + * @return true if the slot request was registered; false if the request is a duplicate + * @throws SlotManagerException if the slot request failed (e.g. not enough resources left) + */ + public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException { + checkInit(); - // record this allocation in bookkeeping - allocationMap.addAllocation(slot.getSlotId(), allocationId); - // remove selected slot from free pool - freeSlots.remove(slot.getSlotId()); + if (checkDuplicateRequest(slotRequest.getAllocationId())) { + LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId()); - sendSlotRequest(slot, request); + return false; } else { - LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " + - "AllocationID:{}, JobID:{}", allocationId, request.getJobId()); - Preconditions.checkState(rmServices != null, - "Attempted to allocate resources but no ResourceManagerServices set."); - rmServices.allocateResource(request.getResourceProfile()); - pendingSlotRequests.put(allocationId, request); + PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest); + + pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest); + + try { + internalRequestSlot(pendingSlotRequest); + } catch (ResourceManagerException e) { + // requesting the slot failed --> remove pending slot request + pendingSlotRequests.remove(slotRequest.getAllocationId()); + + throw new SlotManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e); + } + + return true; } + } - return new RMSlotRequestRegistered(allocationId); + /** + * Cancels and removes a pending slot request with the given allocation id. If there is no such + * pending request, then nothing is done. + * + * @param allocationId identifying the pending slot request + * @return True if a pending slot request was found; otherwise false + */ + public boolean unregisterSlotRequest(AllocationID allocationId) { + checkInit(); + + PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId); + + if (null != pendingSlotRequest) { + cancelPendingSlotRequest(pendingSlotRequest); + + return true; + } else { + LOG.debug("No pending slot request with allocation id {} found.", allocationId); + + return false; + } } /** - * Notifies the SlotManager that a slot is available again after being allocated. - * @param slotID slot id of available slot + * Registers a new task manager at the slot manager. This will make the task managers slots + * known and, thus, available for allocation. + * + * @param taskExecutorConnection for the new task manager + * @param initialSlotReport for the new task manager */ - public void notifySlotAvailable(ResourceID resourceID, SlotID slotID) { - if (!allocationMap.isAllocated(slotID)) { - throw new IllegalStateException("Slot was not previously allocated but " + - "TaskManager reports it as available again"); + public void registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) { + checkInit(); + + // we identify task managers by their instance id + if (!taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) { + TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(taskExecutorConnection); + taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration); } - allocationMap.removeAllocation(slotID); - final Map<SlotID, ResourceSlot> slots = registeredSlots.get(resourceID); - ResourceSlot freeSlot = slots.get(slotID); - if (freeSlot == null) { - throw new IllegalStateException("Slot was not registered with SlotManager but " + - "TaskManager reported it to be available."); + + reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport); + } + + /** + * Unregisters the task manager identified by the given instance id and its associated slots + * from the slot manager. + * + * @param instanceId identifying the task manager to unregister + * @return True if there existed a registered task manager with the given instance id + */ + public boolean unregisterTaskManager(InstanceID instanceId) { + checkInit(); + + TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.remove(instanceId); + + if (null != taskManagerRegistration) { + removeSlots(taskManagerRegistration.getSlots()); + + taskManagerRegistration.cancelTimeout(); + + return true; + } else { + LOG.debug("There is no task manager registered with instance ID {}. Ignoring this message.", instanceId); + + return false; } - handleFreeSlot(freeSlot); } /** - * The slot request to TaskManager may be either failed by rpc communication (timeout, network error, etc.) - * or really rejected by TaskManager. We shall retry this request by: - * <ul> - * <li>1. verify and clear all the previous allocate information for this request - * <li>2. try to request slot again - * </ul> - * <p> - * This may cause some duplicate allocation, e.g. the slot request to TaskManager is successful but the response - * is lost somehow, so we may request a slot in another TaskManager, this causes two slots assigned to one request, - * but it can be taken care of by rejecting registration at JobManager. + * Reports the current slot allocations for a task manager identified by the given instance id. * - * @param originalRequest The original slot request - * @param slotId The target SlotID + * @param instanceId identifying the task manager for which to report the slot status + * @param slotReport containing the status for all of its slots + * @return true if the slot status has been updated successfully, otherwise false */ - void handleSlotRequestFailedAtTaskManager(final SlotRequest originalRequest, final SlotID slotId) { - final AllocationID originalAllocationId = originalRequest.getAllocationId(); - LOG.info("Slot request failed at TaskManager, SlotID:{}, AllocationID:{}, JobID:{}", - slotId, originalAllocationId, originalRequest.getJobId()); - - if (allocationMap.isAllocated(slotId)) { - final AllocationID expectedAllocationId = allocationMap.getAllocationID(slotId); - - // check whether we have an agreement on whom this slot belongs to - if (originalAllocationId.equals(expectedAllocationId)) { - LOG.info("De-allocate this request and retry"); - allocationMap.removeAllocation(expectedAllocationId); - pendingSlotRequests.put(originalRequest.getAllocationId(), originalRequest); - ResourceSlot slot = checkNotNull(getRegisteredSlot(slotId)); - // treat this slot as empty and retry with a different request - handleFreeSlot(slot); + public boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) { + checkInit(); + + TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId); + + if (null != taskManagerRegistration) { + HashSet<SlotID> slotsToRemove = new HashSet<>(taskManagerRegistration.getSlots()); + boolean idle = true; + + for (SlotStatus slotStatus : slotReport) { + if (slotsToRemove.remove(slotStatus.getSlotID())) { + // slot which was already registered + updateSlot(slotStatus.getSlotID(), slotStatus.getAllocationID()); + } else { + // new slot + registerSlot( + instanceId, + slotStatus.getSlotID(), + slotStatus.getAllocationID(), + slotStatus.getResourceProfile(), + taskManagerRegistration.getTaskManagerConnection()); + } + + TaskManagerSlot slot = slots.get(slotStatus.getSlotID()); + + idle &= slot.isFree(); + } + + // remove the slots for which we haven't received a slot status message + removeSlots(slotsToRemove); + + if (idle) { --- End diff -- I assume this breaks the TaskManager timeouts. Every time a heartbeat comes (with a slot report), the TaskManager is detected to be idle, and the timeout is scheduled, overriding the previous timeout (pushing the timeout further back). > Harden SlotManager > ------------------ > > Key: FLINK-5810 > URL: https://issues.apache.org/jira/browse/FLINK-5810 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination > Affects Versions: 1.3.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > > Harden the {{SlotManager}} logic to better cope with lost messages. -- This message was sent by Atlassian JIRA (v6.3.15#6346)