StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r255477727
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java ########## @@ -0,0 +1,1324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.util.clock.Clock; +import org.apache.flink.runtime.util.clock.SystemClock; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The slot pool serves slot request issued by {@link ExecutionGraph}. It will attempt to acquire new slots + * from the ResourceManager when it cannot serve a slot request. If no ResourceManager is currently available, + * or it gets a decline from the ResourceManager, or a request times out, it fails the slot request. The slot pool also + * holds all the slots that were offered to it and accepted, and can thus provides registered free slots even if the + * ResourceManager is down. The slots will only be released when they are useless, e.g. when the job is fully running + * but we still have some free slots. + * + * <p>All the allocation or the slot offering will be identified by self generated AllocationID, we will use it to + * eliminate ambiguities. + * + * <p>TODO : Make pending requests location preference aware + * TODO : Make pass location preferences to ResourceManager when sending a slot request + */ +public class SlotPoolImpl implements SlotPool, AllocatedSlotActions { + + protected final Logger log = LoggerFactory.getLogger(getClass()); + + /** The interval (in milliseconds) in which the SlotPool writes its slot distribution on debug level. */ + private static final int STATUS_LOG_INTERVAL_MS = 60_000; + + private final JobID jobId; + + /** All registered TaskManagers, slots will be accepted and used only if the resource is registered. */ + private final HashSet<ResourceID> registeredTaskManagers; + + /** The book-keeping of all allocated slots. */ + private final AllocatedSlots allocatedSlots; + + /** The book-keeping of all available slots. */ + private final AvailableSlots availableSlots; + + /** All pending requests waiting for slots. */ + private final DualKeyMap<SlotRequestId, AllocationID, PendingRequest> pendingRequests; + + /** The requests that are waiting for the resource manager to be connected. */ + private final HashMap<SlotRequestId, PendingRequest> waitingForResourceManager; + + /** Timeout for external request calls (e.g. to the ResourceManager or the TaskExecutor). */ + private final Time rpcTimeout; + + /** Timeout for releasing idle slots. */ + private final Time idleSlotTimeout; + + private final Clock clock; + + /** the fencing token of the job manager. */ + private JobMasterId jobMasterId; + + /** The gateway to communicate with resource manager. */ + private ResourceManagerGateway resourceManagerGateway; + + private String jobManagerAddress; + + private ComponentMainThreadExecutor jmMainThreadScheduledExecutor; + + // ------------------------------------------------------------------------ + + @VisibleForTesting + public SlotPoolImpl(JobID jobId) { + this( + jobId, + SystemClock.getInstance(), + AkkaUtils.getDefaultTimeout(), + Time.milliseconds(JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue())); + } + + public SlotPoolImpl( + JobID jobId, + Clock clock, + Time rpcTimeout, + Time idleSlotTimeout) { + + this.jobId = checkNotNull(jobId); + this.clock = checkNotNull(clock); + this.rpcTimeout = checkNotNull(rpcTimeout); + this.idleSlotTimeout = checkNotNull(idleSlotTimeout); + + this.registeredTaskManagers = new HashSet<>(16); + this.allocatedSlots = new AllocatedSlots(); + this.availableSlots = new AvailableSlots(); + this.pendingRequests = new DualKeyMap<>(16); + this.waitingForResourceManager = new HashMap<>(16); + + this.jobMasterId = null; + this.resourceManagerGateway = null; + this.jobManagerAddress = null; + + this.jmMainThreadScheduledExecutor = null; + } + + // ------------------------------------------------------------------------ + // Starting and Stopping + // ------------------------------------------------------------------------ + + /** + * Start the slot pool to accept RPC calls. + * + * @param jobMasterId The necessary leader id for running the job. + * @param newJobManagerAddress for the slot requests which are sent to the resource manager + */ + public void start( + @Nonnull JobMasterId jobMasterId, + @Nonnull String newJobManagerAddress, + @Nonnull ComponentMainThreadExecutor jmMainThreadScheduledExecutor) throws Exception { + + this.jobMasterId = jobMasterId; + this.jobManagerAddress = newJobManagerAddress; + this.jmMainThreadScheduledExecutor = jmMainThreadScheduledExecutor; + + scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout); + + if (log.isDebugEnabled()) { + scheduleRunAsync(this::scheduledLogStatus, STATUS_LOG_INTERVAL_MS, TimeUnit.MILLISECONDS); + } + } + + /** + * Suspends this pool, meaning it has lost its authority to accept and distribute slots. + */ + @Override + public void suspend() { + + jmMainThreadScheduledExecutor.assertRunningInMainThread(); + + log.info("Suspending SlotPool."); + + // cancel all pending allocations --> we can request these slots + // again after we regained the leadership + Set<AllocationID> allocationIds = pendingRequests.keySetB(); + + for (AllocationID allocationId : allocationIds) { + resourceManagerGateway.cancelSlotRequest(allocationId); + } + + // do not accept any requests + jobMasterId = null; + resourceManagerGateway = null; + + // Clear (but not release!) the available slots. The TaskManagers should re-register them + // at the new leader JobManager/SlotPool + clear(); + } + + @Override + public void close() { + log.info("Stopping SlotPool."); + // cancel all pending allocations + Set<AllocationID> allocationIds = pendingRequests.keySetB(); + + for (AllocationID allocationId : allocationIds) { + resourceManagerGateway.cancelSlotRequest(allocationId); + } + + // release all registered slots by releasing the corresponding TaskExecutors + for (ResourceID taskManagerResourceId : registeredTaskManagers) { + final FlinkException cause = new FlinkException( + "Releasing TaskManager " + taskManagerResourceId + ", because of stopping of SlotPool"); + releaseTaskManagerInternal(taskManagerResourceId, cause); + } + + clear(); + } + + // ------------------------------------------------------------------------ + // Resource Manager Connection + // ------------------------------------------------------------------------ + + @Override + public void connectToResourceManager(@Nonnull ResourceManagerGateway resourceManagerGateway) { + this.resourceManagerGateway = checkNotNull(resourceManagerGateway); + + // work on all slots waiting for this connection + for (PendingRequest pendingRequest : waitingForResourceManager.values()) { + requestSlotFromResourceManager(resourceManagerGateway, pendingRequest); + } + + // all sent off + waitingForResourceManager.clear(); + } + + @Override + public void disconnectResourceManager() { + this.resourceManagerGateway = null; + } + + // ------------------------------------------------------------------------ + // Slot Allocation + // ------------------------------------------------------------------------ + + /** + * Requests a new slot with the given {@link ResourceProfile} from the ResourceManager. If there is + * currently not ResourceManager connected, then the request is stashed and send once a new + * ResourceManager is connected. + * + * @param slotRequestId identifying the requested slot + * @param resourceProfile which the requested slot should fulfill + * @param timeout timeout before the slot allocation times out + * @return An {@link AllocatedSlot} future which is completed once the slot is offered to the {@link SlotPool} + */ + @Nonnull + private CompletableFuture<AllocatedSlot> requestNewAllocatedSlotInternal( Review comment: Hm, this class is separated into sections by the comments that where in the original code. Are you suggesting to remove this separation altogether or maybe just move public methods to the top in their section? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services