[ https://issues.apache.org/jira/browse/FLINK-4347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15430988#comment-15430988 ]
ASF GitHub Bot commented on FLINK-4347: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2388#discussion_r75698545 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java --- @@ -0,0 +1,519 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.ResourceSlot; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest; +import org.apache.flink.runtime.rpc.taskexecutor.SlotReport; +import org.apache.flink.runtime.rpc.taskexecutor.SlotStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * SlotManager is responsible for receiving slot requests and do slot allocations. It allows to request + * slots from registered TaskManagers and issues container allocation requests in case of there are not + * enough available slots. Besides, it should sync its slot allocation with TaskManager's heartbeat. + * <p> + * The main operation principle of SlotManager is: + * <ul> + * <li>1. All slot allocation status should be synced with TaskManager, which is the ground truth.</li> + * <li>2. All slots that have registered must be tracked, either by free pool or allocated pool.</li> + * <li>3. All slot requests will be handled by best efforts, there is no guarantee that one request will be + * fulfilled in time or correctly allocated. Conflicts or timeout or some special error will happen, it should + * be handled outside SlotManager. SlotManager will make each decision based on the information it currently + * holds.</li> + * </ul> + * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>. + */ +public abstract class SlotManager { + + private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class); + + /** Gateway to communicate with ResourceManager */ + private final ResourceManagerGateway resourceManagerGateway; + + /** All registered slots, including free and allocated slots */ + private final Map<ResourceID, Map<SlotID, ResourceSlot>> registeredSlots; + + /** All pending slot requests, waiting available slots to fulfil */ + private final Map<AllocationID, SlotRequest> pendingSlotRequests; + + /** All free slots that can be used to be allocated */ + private final Map<SlotID, ResourceSlot> freeSlots; + + /** All allocations, we can lookup allocations either by SlotID or AllocationID */ + private final AllocationMap allocationMap; + + public SlotManager(ResourceManagerGateway resourceManagerGateway) { + this.resourceManagerGateway = resourceManagerGateway; + this.registeredSlots = new HashMap<>(); + this.pendingSlotRequests = new LinkedHashMap<>(); + this.freeSlots = new HashMap<>(); + this.allocationMap = new AllocationMap(); + } + + // ------------------------------------------------------------------------ + // slot managements + // ------------------------------------------------------------------------ + + /** + * Request a slot with requirements, we may either fulfill the request or pending it. Trigger container + * allocation if we don't have enough resource. If we have free slot which can match the request, record + * this allocation and forward the request to TaskManager through ResourceManager (we want this done by + * RPC's main thread to avoid race condition). + * + * @param request The detailed request of the slot + */ + public void requestSlot(final SlotRequest request) { + if (isRequestDuplicated(request)) { + LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId()); + return; + } + + // try to fulfil the request with current free slots + ResourceSlot slot = chooseSlotToUse(request, freeSlots); + if (slot != null) { + LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(), + request.getAllocationId(), request.getJobId()); + + // record this allocation in bookkeeping + allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId()); + + // remove selected slot from free pool + freeSlots.remove(slot.getSlotId()); + + // TODO: send slot request to TaskManager + } else { + LOG.info("Does not have enough slot now, pending request and try to allocate new container, " + + "AllocationID:{}, JobID:{}", request.getAllocationId(), request.getJobId()); + allocateContainer(request.getResourceProfile()); + pendingSlotRequests.put(request.getAllocationId(), request); + } + } + + /** + * Sync slot status with TaskManager's SlotReport. + */ + public void updateSlotStatus(final SlotReport slotReport) { + for (SlotStatus slotStatus : slotReport.getSlotsStatus()) { + updateSlotStatus(slotStatus); + } + } + + /** + * The slot request to TaskManager may be either failed by rpc communication(timeout, network error, etc.) + * or really rejected by TaskManager. We shall retry this request by: + * <ul> + * <li>1. verify and clear all the previous allocate information for this request + * <li>2. try to request slot again + * </ul> + * <p> + * This may cause some duplicate allocation, e.g. the slot request to TaskManager is successful but the response + * is lost somehow, so we may request a slot in another TaskManager, this causes two slots assigned to one request, + * but it can be taken care of by rejecting registration at JobManager. + * + * @param originalRequest The original slot request + * @param slotId The target SlotID + */ + public void handleSlotRequestFailedAtTaskManager(final SlotRequest originalRequest, final SlotID slotId) { + final AllocationID originalAllocationId = originalRequest.getAllocationId(); + LOG.info("Slot request failed at TaskManager, SlotID:{}, AllocationID:{}, JobID:{}", + slotId, originalAllocationId, originalRequest.getJobId()); + + // verify the allocation info before we do anything + if (freeSlots.containsKey(slotId)) { + // this slot is currently empty, no need to de-allocate it from our allocations + LOG.info("Original slot is somehow empty, retrying this request"); + + // before retry, we should double check whether this request was allocated by some other ways + if (!allocationMap.isAllocated(originalAllocationId)) { + requestSlot(originalRequest); + } else { + LOG.info("The failed request is somehow been allocated, SlotID:{}", + allocationMap.getSlotID(originalAllocationId)); + } + } else if (allocationMap.isAllocated(slotId)) { + final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId); + + // check whether we have an agreement on whom this slot belongs to + if (originalAllocationId.equals(currentAllocationId)) { + LOG.info("De-allocate this request and retry"); + allocationMap.removeAllocation(currentAllocationId); + + // put this slot back to free pool + ResourceSlot slot = checkNotNull(getRegisteredSlot(slotId)); + freeSlots.put(slotId, slot); + + // retry the request + requestSlot(originalRequest); + } else { + // the slot is taken by someone else, no need to de-allocate it from our allocations + LOG.info("Original slot is taken by someone else, current AllocationID:{}", currentAllocationId); + + // before retry, we should double check whether this request was allocated by some other ways + if (!allocationMap.isAllocated(originalAllocationId)) { + requestSlot(originalRequest); + } else { + LOG.info("The failed request is somehow been allocated, SlotID:{}", + allocationMap.getSlotID(originalAllocationId)); + } + } + } else { + LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId); + } + } + + /** + * TaskManager been notified failure, we should clean up all its slots. + * + * @param resourceId The ResourceID of the TaskManager + */ + public void notifyTaskManagerFailure(final ResourceID resourceId) { + LOG.info("Resource:{} been notified failure", resourceId); + final Map<SlotID, ResourceSlot> slotIdsToRemove = registeredSlots.remove(resourceId); + if (slotIdsToRemove == null) { + return; + } + for (SlotID slotId : slotIdsToRemove.keySet()) { + LOG.info("Removing Slot:{} upon resource failure", slotId); + if (freeSlots.containsKey(slotId)) { + freeSlots.remove(slotId); + } else if (allocationMap.isAllocated(slotId)) { + allocationMap.removeAllocation(slotId); + } else { + LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId); + } + } + } + + // ------------------------------------------------------------------------ + // internal behaviors + // ------------------------------------------------------------------------ + + /** + * Update slot status based on TaskManager's report. There are mainly two situations when we receive the report: + * <ul> + * <li>1. The slot is newly registered.</li> + * <li>2. The slot has registered, it contains its current status.</li> + * </ul> + * <p> + * Regarding 1: It's fairly simple, we just record this slot's status, and trigger schedule if sot is empty. --- End diff -- Typo: slot > Implement SlotManager core > -------------------------- > > Key: FLINK-4347 > URL: https://issues.apache.org/jira/browse/FLINK-4347 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management > Reporter: Kurt Young > Assignee: Kurt Young > > The slot manager is responsible to maintain the list of slot requests and > slot allocations. It allows to request slots from the registered > TaskExecutors and issues container allocation requests in case that there are > not enough available resources. -- This message was sent by Atlassian JIRA (v6.3.4#6332)