[ https://issues.apache.org/jira/browse/FLINK-5810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15928747#comment-15928747 ]
ASF GitHub Bot commented on FLINK-5810: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3394#discussion_r106515494 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -21,519 +21,897 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.clusterframework.types.ResourceSlot; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot; import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.concurrent.BiFunction; +import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.resourcemanager.ResourceManagerServices; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.SlotRequest; -import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered; -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected; -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply; -import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; +import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; - -import static org.apache.flink.util.Preconditions.checkNotNull; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.CancellationException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** - * SlotManager is responsible for receiving slot requests and do slot allocations. It allows to request - * slots from registered TaskManagers and issues container allocation requests in case of there are not - * enough available slots. Besides, it should sync its slot allocation with TaskManager's heartbeat. - * <p> - * The main operation principle of SlotManager is: - * <ul> - * <li>1. All slot allocation status should be synced with TaskManager, which is the ground truth.</li> - * <li>2. All slots that have registered must be tracked, either by free pool or allocated pool.</li> - * <li>3. All slot requests will be handled by best efforts, there is no guarantee that one request will be - * fulfilled in time or correctly allocated. Conflicts or timeout or some special error will happen, it should - * be handled outside SlotManager. SlotManager will make each decision based on the information it currently - * holds.</li> - * </ul> - * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>. + * The slot manager is responsible for maintaining a view on all registered task manager slots, + * their allocation and all pending slot requests. Whenever a new slot is registered or and + * allocated slot is freed, then it tries to fulfill another pending slot request. Whenever there + * are not enough slots available the slot manager will notify the resource manager about it via + * {@link ResourceManagerActions#allocateResource(ResourceProfile)}. + * + * In order to free resources and avoid resource leaks, idling task managers (task managers whose + * slots are currently not used) and not fulfilled pending slot requests time out triggering their + * release and failure, respectively. */ -public abstract class SlotManager { +public class SlotManager implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class); + + /** Scheduled executor for timeouts */ + private final ScheduledExecutor scheduledExecutor; + + /** Timeout for slot requests to the task manager */ + private final Time taskManagerRequestTimeout; + + /** Timeout after which an allocation is discarded */ + private final Time slotRequestTimeout; + + /** Timeout after which an unused TaskManager is released */ + private final Time taskManagerTimeout; + + /** Map for all registered slots */ + private final HashMap<SlotID, TaskManagerSlot> slots; - protected final Logger LOG = LoggerFactory.getLogger(getClass()); + /** Index of all currently free slots */ + private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots; - /** The Resource allocation provider */ - protected final ResourceManagerServices rmServices; + /** All currently registered task managers */ + private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations; - /** All registered task managers with ResourceID and gateway. */ - private final Map<ResourceID, TaskExecutorRegistration> taskManagers; + /** Map of fulfilled and active allocations for request deduplication purposes */ + private final HashMap<AllocationID, SlotID> fulfilledSlotRequests; - /** All registered slots, including free and allocated slots */ - private final Map<ResourceID, Map<SlotID, ResourceSlot>> registeredSlots; + /** Map of pending/unfulfilled slot allocation requests */ + private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests; - /** All pending slot requests, waiting available slots to fulfil */ - private final Map<AllocationID, SlotRequest> pendingSlotRequests; + /** Leader id of the containing component */ + private UUID leaderId; - /** All free slots that can be used to be allocated */ - private final Map<SlotID, ResourceSlot> freeSlots; + /** Executor for future callbacks which have to be "synchronized" */ + private Executor mainThreadExecutor; - /** All allocations, we can lookup allocations either by SlotID or AllocationID */ - private final AllocationMap allocationMap; + /** Callbacks for resource (de-)allocations */ + private ResourceManagerActions resourceManagerActions; - private final Time timeout; + /** True iff the component has been started */ + private boolean started; - public SlotManager(ResourceManagerServices rmServices) { - this.rmServices = checkNotNull(rmServices); - this.registeredSlots = new HashMap<>(16); - this.pendingSlotRequests = new LinkedHashMap<>(16); - this.freeSlots = new HashMap<>(16); - this.allocationMap = new AllocationMap(); - this.taskManagers = new HashMap<>(); - this.timeout = Time.seconds(10); + public SlotManager( + ScheduledExecutor scheduledExecutor, + Time taskManagerRequestTimeout, + Time slotRequestTimeout, + Time taskManagerTimeout) { + this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor); + this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout); + this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout); + this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout); + + slots = new HashMap<>(16); + freeSlots = new LinkedHashMap<>(16); + taskManagerRegistrations = new HashMap<>(4); + fulfilledSlotRequests = new HashMap<>(16); + pendingSlotRequests = new HashMap<>(16); + + leaderId = null; + resourceManagerActions = null; + started = false; } - // ------------------------------------------------------------------------ - // slot managements - // ------------------------------------------------------------------------ + // --------------------------------------------------------------------------------------------- + // Component lifecycle methods + // --------------------------------------------------------------------------------------------- /** - * Request a slot with requirements, we may either fulfill the request or pending it. Trigger container - * allocation if we don't have enough resource. If we have free slot which can match the request, record - * this allocation and forward the request to TaskManager through ResourceManager (we want this done by - * RPC's main thread to avoid race condition). + * Starts the slot manager with the given leader id and resource manager actions. * - * @param request The detailed request of the slot - * @return RMSlotRequestRegistered The confirmation message to be send to the caller + * @param newLeaderId to use for communication with the task managers + * @param newResourceManagerActions to use for resource (de-)allocations + */ + public void start(UUID newLeaderId, Executor newMainThreadExecutor, ResourceManagerActions newResourceManagerActions) { + leaderId = Preconditions.checkNotNull(newLeaderId); + mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor); + resourceManagerActions = Preconditions.checkNotNull(newResourceManagerActions); + + started = true; + } + + /** + * Suspends the component. This clears the internal state of the slot manager. */ - public RMSlotRequestRegistered requestSlot(final SlotRequest request) { - final AllocationID allocationId = request.getAllocationId(); - if (isRequestDuplicated(request)) { - LOG.warn("Duplicated slot request, AllocationID:{}", allocationId); - return new RMSlotRequestRegistered(allocationId); + public void suspend() { + for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) { + cancelPendingSlotRequest(pendingSlotRequest); } - // try to fulfil the request with current free slots - final ResourceSlot slot = chooseSlotToUse(request, freeSlots); - if (slot != null) { - LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(), - allocationId, request.getJobId()); + pendingSlotRequests.clear(); + + HashSet<InstanceID> registeredTaskManagers = new HashSet<>(taskManagerRegistrations.keySet()); + + for (InstanceID registeredTaskManager : registeredTaskManagers) { + unregisterTaskManager(registeredTaskManager); + } + + leaderId = null; + resourceManagerActions = null; + started = false; + } + + /** + * Closes the slot manager. + * + * @throws Exception if the close operation fails + */ + @Override + public void close() throws Exception { + suspend(); + } + + // --------------------------------------------------------------------------------------------- + // Public API + // --------------------------------------------------------------------------------------------- + + /** + * Requests a slot with the respective resource profile. + * + * @param slotRequest specifying the requested slot specs + * @return true if the slot request was registered; false if the request is a duplicate + * @throws SlotManagerException if the slot request failed (e.g. not enough resources left) + */ + public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException { + checkInit(); - // record this allocation in bookkeeping - allocationMap.addAllocation(slot.getSlotId(), allocationId); - // remove selected slot from free pool - freeSlots.remove(slot.getSlotId()); + if (checkDuplicateRequest(slotRequest.getAllocationId())) { + LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId()); - sendSlotRequest(slot, request); + return false; } else { - LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " + - "AllocationID:{}, JobID:{}", allocationId, request.getJobId()); - Preconditions.checkState(rmServices != null, - "Attempted to allocate resources but no ResourceManagerServices set."); - rmServices.allocateResource(request.getResourceProfile()); - pendingSlotRequests.put(allocationId, request); + PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest); + + pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest); + + try { + internalRequestSlot(pendingSlotRequest); + } catch (ResourceManagerException e) { + // requesting the slot failed --> remove pending slot request + pendingSlotRequests.remove(slotRequest.getAllocationId()); + + throw new SlotManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e); + } + + return true; } + } - return new RMSlotRequestRegistered(allocationId); + /** + * Cancels and removes a pending slot request with the given allocation id. If there is no such + * pending request, then nothing is done. + * + * @param allocationId identifying the pending slot request + * @return True if a pending slot request was found; otherwise false + */ + public boolean unregisterSlotRequest(AllocationID allocationId) { + checkInit(); + + PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId); + + if (null != pendingSlotRequest) { + cancelPendingSlotRequest(pendingSlotRequest); + + return true; + } else { + LOG.debug("No pending slot request with allocation id {} found.", allocationId); + + return false; + } } /** - * Notifies the SlotManager that a slot is available again after being allocated. - * @param slotID slot id of available slot + * Registers a new task manager at the slot manager. This will make the task managers slots + * known and, thus, available for allocation. + * + * @param taskExecutorConnection for the new task manager + * @param initialSlotReport for the new task manager */ - public void notifySlotAvailable(ResourceID resourceID, SlotID slotID) { - if (!allocationMap.isAllocated(slotID)) { - throw new IllegalStateException("Slot was not previously allocated but " + - "TaskManager reports it as available again"); + public void registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) { + checkInit(); + + // we identify task managers by their instance id + if (!taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) { + TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(taskExecutorConnection); + taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration); } - allocationMap.removeAllocation(slotID); - final Map<SlotID, ResourceSlot> slots = registeredSlots.get(resourceID); - ResourceSlot freeSlot = slots.get(slotID); - if (freeSlot == null) { - throw new IllegalStateException("Slot was not registered with SlotManager but " + - "TaskManager reported it to be available."); + + reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport); + } + + /** + * Unregisters the task manager identified by the given instance id and its associated slots + * from the slot manager. + * + * @param instanceId identifying the task manager to unregister + * @return True if there existed a registered task manager with the given instance id + */ + public boolean unregisterTaskManager(InstanceID instanceId) { + checkInit(); + + TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.remove(instanceId); + + if (null != taskManagerRegistration) { + removeSlots(taskManagerRegistration.getSlots()); + + taskManagerRegistration.cancelTimeout(); + + return true; + } else { + LOG.debug("There is no task manager registered with instance ID {}. Ignoring this message.", instanceId); + + return false; } - handleFreeSlot(freeSlot); } /** - * The slot request to TaskManager may be either failed by rpc communication (timeout, network error, etc.) - * or really rejected by TaskManager. We shall retry this request by: - * <ul> - * <li>1. verify and clear all the previous allocate information for this request - * <li>2. try to request slot again - * </ul> - * <p> - * This may cause some duplicate allocation, e.g. the slot request to TaskManager is successful but the response - * is lost somehow, so we may request a slot in another TaskManager, this causes two slots assigned to one request, - * but it can be taken care of by rejecting registration at JobManager. + * Reports the current slot allocations for a task manager identified by the given instance id. * - * @param originalRequest The original slot request - * @param slotId The target SlotID + * @param instanceId identifying the task manager for which to report the slot status + * @param slotReport containing the status for all of its slots + * @return true if the slot status has been updated successfully, otherwise false */ - void handleSlotRequestFailedAtTaskManager(final SlotRequest originalRequest, final SlotID slotId) { - final AllocationID originalAllocationId = originalRequest.getAllocationId(); - LOG.info("Slot request failed at TaskManager, SlotID:{}, AllocationID:{}, JobID:{}", - slotId, originalAllocationId, originalRequest.getJobId()); - - if (allocationMap.isAllocated(slotId)) { - final AllocationID expectedAllocationId = allocationMap.getAllocationID(slotId); - - // check whether we have an agreement on whom this slot belongs to - if (originalAllocationId.equals(expectedAllocationId)) { - LOG.info("De-allocate this request and retry"); - allocationMap.removeAllocation(expectedAllocationId); - pendingSlotRequests.put(originalRequest.getAllocationId(), originalRequest); - ResourceSlot slot = checkNotNull(getRegisteredSlot(slotId)); - // treat this slot as empty and retry with a different request - handleFreeSlot(slot); + public boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) { + checkInit(); + + TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId); + + if (null != taskManagerRegistration) { + HashSet<SlotID> slotsToRemove = new HashSet<>(taskManagerRegistration.getSlots()); + boolean idle = true; + + for (SlotStatus slotStatus : slotReport) { + if (slotsToRemove.remove(slotStatus.getSlotID())) { + // slot which was already registered + updateSlot(slotStatus.getSlotID(), slotStatus.getAllocationID()); + } else { + // new slot + registerSlot( + instanceId, + slotStatus.getSlotID(), + slotStatus.getAllocationID(), + slotStatus.getResourceProfile(), + taskManagerRegistration.getTaskManagerConnection()); + } + + TaskManagerSlot slot = slots.get(slotStatus.getSlotID()); + + idle &= slot.isFree(); + } + + // remove the slots for which we haven't received a slot status message + removeSlots(slotsToRemove); + + if (idle) { + // no slot of this task manager is being used --> register timer to free this resource + registerTaskManagerTimeout(taskManagerRegistration); + } + + return true; + } else { + LOG.debug("Received slot report for unknown task manager with instance id {}. Ignoring this report.", instanceId); + + return false; + } + } + + /** + * Free the given slot from the given allocation. If the slot is still allocated by the given + * allocation id, then the slot will be marked as free and will be subject to new slot requests. + * + * @param slotId identifying the slot to free + * @param allocationId with which the slot is presumably allocated + */ + public void freeSlot(SlotID slotId, AllocationID allocationId) { + checkInit(); + + TaskManagerSlot slot = slots.get(slotId); + + if (null != slot) { + if (slot.isAllocated()) { + if (Objects.equals(allocationId, slot.getAllocationId())) { + // free the slot + slot.setAllocationId(null); + fulfilledSlotRequests.remove(allocationId); + + if (slot.isFree()) { + handleFreeSlot(slot); + } + + TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId()); + + if (null != taskManagerRegistration && !anySlotUsed(taskManagerRegistration.getSlots())) { + registerTaskManagerTimeout(taskManagerRegistration); + } + } else { + LOG.debug("Received request to free slot {} with expected allocation id {}, " + + "but actual allocation id {} differs. Ignoring the request.", slotId, allocationId, slot.getAllocationId()); + } } else { - LOG.error("Slot request failed for slot {} with allocation id {}:" + - " Allocation id did not match the expected allocation id {}.", - slotId, originalAllocationId, expectedAllocationId); + LOG.debug("Slot {} has not been allocated.", allocationId); } } else { - LOG.error("Slot request failed for slot {} with allocation id {}: " + - "Slot was not previously registered.", - slotId, originalAllocationId); + LOG.debug("Trying to free a slot {} which has not been registered. Ignoring this message.", slotId); } } + // --------------------------------------------------------------------------------------------- + // Behaviour methods + // --------------------------------------------------------------------------------------------- + /** - * Registers a TaskExecutor - * @param resourceID TaskExecutor's ResourceID - * @param registration TaskExecutor's registration - * @param slotReport TaskExecutor's free and allocated slots + * Finds a matching slot request for a given resource profile. If there is no such request, + * the method returns null. + * + * Note: If you want to change the behaviour of the slot manager wrt slot allocation and + * request fulfillment, then you should override this method. + * + * @param slotResourceProfile defining the resources of an available slot + * @return A matching slot request which can be deployed in a slot with the given resource + * profile. Null if there is no such slot request pending. */ - public void registerTaskExecutor( - ResourceID resourceID, - TaskExecutorRegistration registration, - SlotReport slotReport) { + protected PendingSlotRequest findMatchingRequest(ResourceProfile slotResourceProfile) { - if (taskManagers.get(resourceID) != null) { - notifyTaskManagerFailure(resourceID); + for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) { + if (!pendingSlotRequest.isAssigned() && slotResourceProfile.isMatching(pendingSlotRequest.getResourceProfile())) { + return pendingSlotRequest; + } } - this.taskManagers.put(resourceID, registration); + return null; + } + + /** + * Finds a matching slot for a given resource profile. A matching slot has at least as many + * resources available as the given resource profile. If there is no such slot available, then + * the method returns null. + * + * Note: If you want to change the behaviour of the slot manager wrt slot allocation and + * request fulfillment, then you should override this method. + * + * @param requestResourceProfile specifying the resource requirements for the a slot request + * @return A matching slot which fulfills the given resource profile. Null if there is no such + * slot available. + */ + protected TaskManagerSlot findMatchingSlot(ResourceProfile requestResourceProfile) { + Iterator<Map.Entry<SlotID, TaskManagerSlot>> iterator = freeSlots.entrySet().iterator(); + + while (iterator.hasNext()) { + TaskManagerSlot taskManagerSlot = iterator.next().getValue(); - for (SlotStatus slotStatus : slotReport.getSlotsStatus()) { - final SlotID slotId = slotStatus.getSlotID(); + // sanity check + Preconditions.checkState(taskManagerSlot.isFree()); - final TaskExecutorRegistration taskExecutorRegistration = taskManagers.get(slotId.getResourceID()); - if (taskExecutorRegistration == null) { - LOG.info("Received SlotStatus but ResourceID {} is unknown to the SlotManager", - slotId.getResourceID()); - return; + if (taskManagerSlot.getResourceProfile().isMatching(requestResourceProfile)) { + iterator.remove(); + return taskManagerSlot; } + } - final ResourceSlot slot = new ResourceSlot(slotId, slotStatus.getProfiler(), taskExecutorRegistration); + return null; + } - registerNewSlot(slot); - LOG.info("New slot appeared, SlotID:{}, AllocationID:{}", slotId, slotStatus.getAllocationID()); + // --------------------------------------------------------------------------------------------- + // Internal slot operations + // --------------------------------------------------------------------------------------------- - if (slotStatus.getAllocationID() != null) { - // slot in use, record this in bookkeeping - allocationMap.addAllocation(slotId, slotStatus.getAllocationID()); - } else { + /** + * Registers a slot for the given task manager at the slot manager. The task manager is + * identified by the given instance id and the slot is identified by the given slot id. The + * given resource profile defines the available resources for the slot. The task manager + * connection can be used to communicate with the task manager. + * + * @param instanceId identifying the task manager on which the slot lives + * @param slotId identifying the slot on the task manager + * @param allocationId which is currently deployed in the slot + * @param resourceProfile of the slot + * @param taskManagerConnection to communicate with the remote task manager + */ + private void registerSlot( + InstanceID instanceId, + SlotID slotId, + AllocationID allocationId, + ResourceProfile resourceProfile, + TaskExecutorConnection taskManagerConnection) { + TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId); + + if (null != taskManagerRegistration) { + TaskManagerSlot slot = new TaskManagerSlot( + slotId, + resourceProfile, + taskManagerConnection, + allocationId); + + slots.put(slotId, slot); + + taskManagerRegistration.addSlot(slotId); + + if (slot.isFree()) { handleFreeSlot(slot); } + + if (slot.isAllocated()) { + fulfilledSlotRequests.put(slot.getAllocationId(), slotId); + } + } else { + LOG.debug("Trying to register slot for unknown task manager with instance id {}.", instanceId); } } /** - * Callback for TaskManager failures. In case that a TaskManager fails, we have to clean up all its slots. + * Updates a slot with the given allocation id. * - * @param resourceId The ResourceID of the TaskManager + * @param slotId to update + * @param allocationId specifying the current allocation of the slot */ - public void notifyTaskManagerFailure(final ResourceID resourceId) { - LOG.info("Resource:{} been notified failure", resourceId); - taskManagers.remove(resourceId); - final Map<SlotID, ResourceSlot> slotIdsToRemove = registeredSlots.remove(resourceId); - if (slotIdsToRemove != null) { - for (SlotID slotId : slotIdsToRemove.keySet()) { - LOG.info("Removing Slot: {} upon resource failure", slotId); - if (freeSlots.containsKey(slotId)) { - freeSlots.remove(slotId); - } else if (allocationMap.isAllocated(slotId)) { - allocationMap.removeAllocation(slotId); - } else { - LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId); + private void updateSlot(SlotID slotId, AllocationID allocationId) { + TaskManagerSlot slot = slots.get(slotId); + + if (null != slot) { + // we assume the given allocation id to be the ground truth (coming from the TM) + slot.setAllocationId(allocationId); + + if (null != allocationId) { + if (slot.hasPendingSlotRequest()){ + // we have a pending slot request --> check whether we have to reject it + PendingSlotRequest pendingSlotRequest = slot.getAssignedSlotRequest(); + + if (Objects.equals(pendingSlotRequest.getAllocationId(), allocationId)) { + // we can cancel the slot request because it has been fulfilled + cancelPendingSlotRequest(pendingSlotRequest); + + // remove the pending slot request, since it has been completed + pendingSlotRequests.remove(pendingSlotRequest.getAllocationId()); + } else { + // this will try to find a new slot for the request + rejectPendingSlotRequest( + pendingSlotRequest, + new Exception("Task manager reported slot " + slotId + " being already allocated.")); + } + + slot.setAssignedSlotRequest(null); + } + + fulfilledSlotRequests.put(allocationId, slotId); + + TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId()); + + if (null != taskManagerRegistration) { + // disable any registered time out for the task manager + taskManagerRegistration.cancelTimeout(); } } + } else { + LOG.debug("Trying to update unknown slot with slot id {}.", slotId); } } - // ------------------------------------------------------------------------ - // internal behaviors - // ------------------------------------------------------------------------ - /** - * When we have a free slot, try to fulfill the pending request first. If any request can be fulfilled, - * record this allocation in bookkeeping and send slot request to TaskManager, else we just add this slot - * to the free pool. + * Tries to allocate a slot for the given slot request. If there is no slot available, the + * resource manager is informed to allocate more resources and a timeout for the request is + * registered. * - * @param freeSlot The free slot + * @param pendingSlotRequest to allocate a slot for + * @throws ResourceManagerException if the resource manager cannot allocate more resource */ - private void handleFreeSlot(final ResourceSlot freeSlot) { - SlotRequest chosenRequest = chooseRequestToFulfill(freeSlot, pendingSlotRequests); + private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException { + TaskManagerSlot taskManagerSlot = findMatchingSlot(pendingSlotRequest.getResourceProfile()); - if (chosenRequest != null) { - final AllocationID allocationId = chosenRequest.getAllocationId(); - final SlotRequest slotRequest = pendingSlotRequests.remove(allocationId); + if (taskManagerSlot != null) { + allocateSlot(taskManagerSlot, pendingSlotRequest); + } else { + final UUID timeoutIdentifier = UUID.randomUUID(); + final AllocationID allocationId = pendingSlotRequest.getAllocationId(); + + // register timeout for slot request + ScheduledFuture<?> timeoutFuture = scheduledExecutor.schedule(new Runnable() { + @Override + public void run() { + mainThreadExecutor.execute(new Runnable() { + @Override + public void run() { + timeoutSlotRequest(allocationId, timeoutIdentifier); + } + }); + } + }, slotRequestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); - LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", freeSlot.getSlotId(), - allocationId, chosenRequest.getJobId()); - allocationMap.addAllocation(freeSlot.getSlotId(), allocationId); + pendingSlotRequest.registerTimeout(timeoutFuture, timeoutIdentifier); - sendSlotRequest(freeSlot, slotRequest); - } else { - freeSlots.put(freeSlot.getSlotId(), freeSlot); + resourceManagerActions.allocateResource(pendingSlotRequest.getResourceProfile()); } } - private void sendSlotRequest(final ResourceSlot freeSlot, final SlotRequest slotRequest) { + /** + * Allocates the given slot for the given slot request. This entails sending a registration + * message to the task manager and treating failures. + * + * @param taskManagerSlot to allocate for the given slot request + * @param pendingSlotRequest to allocate the given slot for + */ + private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pendingSlotRequest) { + TaskExecutorConnection taskExecutorConnection = taskManagerSlot.getTaskManagerConnection(); + TaskExecutorGateway gateway = taskExecutorConnection.getTaskExecutorGateway(); + + final CompletableFuture<Acknowledge> completableFuture = new FlinkCompletableFuture<>(); + final AllocationID allocationId = pendingSlotRequest.getAllocationId(); + final SlotID slotId = taskManagerSlot.getSlotId(); + + taskManagerSlot.setAssignedSlotRequest(pendingSlotRequest); + pendingSlotRequest.setRequestFuture(completableFuture); + + // RPC call to the task manager + Future<Acknowledge> requestFuture = gateway.requestSlot( + slotId, + pendingSlotRequest.getJobId(), + allocationId, + pendingSlotRequest.getTargetAddress(), + leaderId, + taskManagerRequestTimeout); + + requestFuture.handle(new BiFunction<Acknowledge, Throwable, Void>() { --- End diff -- This `BiFunction` seems to be an *identity* function. Can we just skip the `completableFuture` and define the proper handler directly on the `requestFuture`? > Harden SlotManager > ------------------ > > Key: FLINK-5810 > URL: https://issues.apache.org/jira/browse/FLINK-5810 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination > Affects Versions: 1.3.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > > Harden the {{SlotManager}} logic to better cope with lost messages. -- This message was sent by Atlassian JIRA (v6.3.15#6346)