[ https://issues.apache.org/jira/browse/FLINK-5810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15928750#comment-15928750 ]
ASF GitHub Bot commented on FLINK-5810: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3394#discussion_r106507099 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -21,519 +21,897 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.clusterframework.types.ResourceSlot; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot; import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.concurrent.BiFunction; +import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.resourcemanager.ResourceManagerServices; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.SlotRequest; -import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered; -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected; -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply; -import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; +import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; - -import static org.apache.flink.util.Preconditions.checkNotNull; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.CancellationException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** - * SlotManager is responsible for receiving slot requests and do slot allocations. It allows to request - * slots from registered TaskManagers and issues container allocation requests in case of there are not - * enough available slots. Besides, it should sync its slot allocation with TaskManager's heartbeat. - * <p> - * The main operation principle of SlotManager is: - * <ul> - * <li>1. All slot allocation status should be synced with TaskManager, which is the ground truth.</li> - * <li>2. All slots that have registered must be tracked, either by free pool or allocated pool.</li> - * <li>3. All slot requests will be handled by best efforts, there is no guarantee that one request will be - * fulfilled in time or correctly allocated. Conflicts or timeout or some special error will happen, it should - * be handled outside SlotManager. SlotManager will make each decision based on the information it currently - * holds.</li> - * </ul> - * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>. + * The slot manager is responsible for maintaining a view on all registered task manager slots, + * their allocation and all pending slot requests. Whenever a new slot is registered or and + * allocated slot is freed, then it tries to fulfill another pending slot request. Whenever there + * are not enough slots available the slot manager will notify the resource manager about it via + * {@link ResourceManagerActions#allocateResource(ResourceProfile)}. + * + * In order to free resources and avoid resource leaks, idling task managers (task managers whose + * slots are currently not used) and not fulfilled pending slot requests time out triggering their + * release and failure, respectively. */ -public abstract class SlotManager { +public class SlotManager implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class); + + /** Scheduled executor for timeouts */ + private final ScheduledExecutor scheduledExecutor; + + /** Timeout for slot requests to the task manager */ + private final Time taskManagerRequestTimeout; + + /** Timeout after which an allocation is discarded */ + private final Time slotRequestTimeout; + + /** Timeout after which an unused TaskManager is released */ + private final Time taskManagerTimeout; + + /** Map for all registered slots */ + private final HashMap<SlotID, TaskManagerSlot> slots; - protected final Logger LOG = LoggerFactory.getLogger(getClass()); + /** Index of all currently free slots */ + private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots; - /** The Resource allocation provider */ - protected final ResourceManagerServices rmServices; + /** All currently registered task managers */ + private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations; - /** All registered task managers with ResourceID and gateway. */ - private final Map<ResourceID, TaskExecutorRegistration> taskManagers; + /** Map of fulfilled and active allocations for request deduplication purposes */ + private final HashMap<AllocationID, SlotID> fulfilledSlotRequests; - /** All registered slots, including free and allocated slots */ - private final Map<ResourceID, Map<SlotID, ResourceSlot>> registeredSlots; + /** Map of pending/unfulfilled slot allocation requests */ + private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests; - /** All pending slot requests, waiting available slots to fulfil */ - private final Map<AllocationID, SlotRequest> pendingSlotRequests; + /** Leader id of the containing component */ + private UUID leaderId; - /** All free slots that can be used to be allocated */ - private final Map<SlotID, ResourceSlot> freeSlots; + /** Executor for future callbacks which have to be "synchronized" */ + private Executor mainThreadExecutor; - /** All allocations, we can lookup allocations either by SlotID or AllocationID */ - private final AllocationMap allocationMap; + /** Callbacks for resource (de-)allocations */ + private ResourceManagerActions resourceManagerActions; - private final Time timeout; + /** True iff the component has been started */ + private boolean started; - public SlotManager(ResourceManagerServices rmServices) { - this.rmServices = checkNotNull(rmServices); - this.registeredSlots = new HashMap<>(16); - this.pendingSlotRequests = new LinkedHashMap<>(16); - this.freeSlots = new HashMap<>(16); - this.allocationMap = new AllocationMap(); - this.taskManagers = new HashMap<>(); - this.timeout = Time.seconds(10); + public SlotManager( + ScheduledExecutor scheduledExecutor, + Time taskManagerRequestTimeout, + Time slotRequestTimeout, + Time taskManagerTimeout) { + this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor); + this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout); + this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout); + this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout); + + slots = new HashMap<>(16); + freeSlots = new LinkedHashMap<>(16); + taskManagerRegistrations = new HashMap<>(4); + fulfilledSlotRequests = new HashMap<>(16); + pendingSlotRequests = new HashMap<>(16); + + leaderId = null; + resourceManagerActions = null; + started = false; } - // ------------------------------------------------------------------------ - // slot managements - // ------------------------------------------------------------------------ + // --------------------------------------------------------------------------------------------- + // Component lifecycle methods + // --------------------------------------------------------------------------------------------- /** - * Request a slot with requirements, we may either fulfill the request or pending it. Trigger container - * allocation if we don't have enough resource. If we have free slot which can match the request, record - * this allocation and forward the request to TaskManager through ResourceManager (we want this done by - * RPC's main thread to avoid race condition). + * Starts the slot manager with the given leader id and resource manager actions. * - * @param request The detailed request of the slot - * @return RMSlotRequestRegistered The confirmation message to be send to the caller + * @param newLeaderId to use for communication with the task managers + * @param newResourceManagerActions to use for resource (de-)allocations + */ + public void start(UUID newLeaderId, Executor newMainThreadExecutor, ResourceManagerActions newResourceManagerActions) { + leaderId = Preconditions.checkNotNull(newLeaderId); + mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor); + resourceManagerActions = Preconditions.checkNotNull(newResourceManagerActions); + + started = true; + } + + /** + * Suspends the component. This clears the internal state of the slot manager. */ - public RMSlotRequestRegistered requestSlot(final SlotRequest request) { - final AllocationID allocationId = request.getAllocationId(); - if (isRequestDuplicated(request)) { - LOG.warn("Duplicated slot request, AllocationID:{}", allocationId); - return new RMSlotRequestRegistered(allocationId); + public void suspend() { + for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) { + cancelPendingSlotRequest(pendingSlotRequest); } - // try to fulfil the request with current free slots - final ResourceSlot slot = chooseSlotToUse(request, freeSlots); - if (slot != null) { - LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(), - allocationId, request.getJobId()); + pendingSlotRequests.clear(); + + HashSet<InstanceID> registeredTaskManagers = new HashSet<>(taskManagerRegistrations.keySet()); --- End diff -- If the copy of the collection is just for concurrent iteration, I would make it an `ArrayList`. Builds faster and iterates faster. > Harden SlotManager > ------------------ > > Key: FLINK-5810 > URL: https://issues.apache.org/jira/browse/FLINK-5810 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination > Affects Versions: 1.3.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > > Harden the {{SlotManager}} logic to better cope with lost messages. -- This message was sent by Atlassian JIRA (v6.3.15#6346)