StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r255462112
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java ########## @@ -0,0 +1,1324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.util.clock.Clock; +import org.apache.flink.runtime.util.clock.SystemClock; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The slot pool serves slot request issued by {@link ExecutionGraph}. It will attempt to acquire new slots + * from the ResourceManager when it cannot serve a slot request. If no ResourceManager is currently available, + * or it gets a decline from the ResourceManager, or a request times out, it fails the slot request. The slot pool also + * holds all the slots that were offered to it and accepted, and can thus provides registered free slots even if the + * ResourceManager is down. The slots will only be released when they are useless, e.g. when the job is fully running + * but we still have some free slots. + * + * <p>All the allocation or the slot offering will be identified by self generated AllocationID, we will use it to + * eliminate ambiguities. + * + * <p>TODO : Make pending requests location preference aware + * TODO : Make pass location preferences to ResourceManager when sending a slot request + */ +public class SlotPoolImpl implements SlotPool, AllocatedSlotActions { + + protected final Logger log = LoggerFactory.getLogger(getClass()); + + /** The interval (in milliseconds) in which the SlotPool writes its slot distribution on debug level. */ + private static final int STATUS_LOG_INTERVAL_MS = 60_000; + + private final JobID jobId; + + /** All registered TaskManagers, slots will be accepted and used only if the resource is registered. */ + private final HashSet<ResourceID> registeredTaskManagers; + + /** The book-keeping of all allocated slots. */ + private final AllocatedSlots allocatedSlots; + + /** The book-keeping of all available slots. */ + private final AvailableSlots availableSlots; + + /** All pending requests waiting for slots. */ + private final DualKeyMap<SlotRequestId, AllocationID, PendingRequest> pendingRequests; + + /** The requests that are waiting for the resource manager to be connected. */ + private final HashMap<SlotRequestId, PendingRequest> waitingForResourceManager; + + /** Timeout for external request calls (e.g. to the ResourceManager or the TaskExecutor). */ + private final Time rpcTimeout; + + /** Timeout for releasing idle slots. */ + private final Time idleSlotTimeout; + + private final Clock clock; + + /** the fencing token of the job manager. */ + private JobMasterId jobMasterId; + + /** The gateway to communicate with resource manager. */ + private ResourceManagerGateway resourceManagerGateway; + + private String jobManagerAddress; + + private ComponentMainThreadExecutor jmMainThreadScheduledExecutor; + + // ------------------------------------------------------------------------ + + @VisibleForTesting + public SlotPoolImpl(JobID jobId) { + this( + jobId, + SystemClock.getInstance(), + AkkaUtils.getDefaultTimeout(), + Time.milliseconds(JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue())); + } + + public SlotPoolImpl( + JobID jobId, + Clock clock, + Time rpcTimeout, + Time idleSlotTimeout) { + + this.jobId = checkNotNull(jobId); + this.clock = checkNotNull(clock); + this.rpcTimeout = checkNotNull(rpcTimeout); + this.idleSlotTimeout = checkNotNull(idleSlotTimeout); + + this.registeredTaskManagers = new HashSet<>(16); + this.allocatedSlots = new AllocatedSlots(); + this.availableSlots = new AvailableSlots(); + this.pendingRequests = new DualKeyMap<>(16); + this.waitingForResourceManager = new HashMap<>(16); + + this.jobMasterId = null; + this.resourceManagerGateway = null; + this.jobManagerAddress = null; + + this.jmMainThreadScheduledExecutor = null; + } + + // ------------------------------------------------------------------------ + // Starting and Stopping + // ------------------------------------------------------------------------ + + /** + * Start the slot pool to accept RPC calls. + * + * @param jobMasterId The necessary leader id for running the job. + * @param newJobManagerAddress for the slot requests which are sent to the resource manager + */ + public void start( + @Nonnull JobMasterId jobMasterId, + @Nonnull String newJobManagerAddress, + @Nonnull ComponentMainThreadExecutor jmMainThreadScheduledExecutor) throws Exception { + + this.jobMasterId = jobMasterId; + this.jobManagerAddress = newJobManagerAddress; + this.jmMainThreadScheduledExecutor = jmMainThreadScheduledExecutor; + + scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout); + + if (log.isDebugEnabled()) { + scheduleRunAsync(this::scheduledLogStatus, STATUS_LOG_INTERVAL_MS, TimeUnit.MILLISECONDS); + } + } + + /** + * Suspends this pool, meaning it has lost its authority to accept and distribute slots. + */ + @Override + public void suspend() { + + jmMainThreadScheduledExecutor.assertRunningInMainThread(); + + log.info("Suspending SlotPool."); + + // cancel all pending allocations --> we can request these slots + // again after we regained the leadership + Set<AllocationID> allocationIds = pendingRequests.keySetB(); + + for (AllocationID allocationId : allocationIds) { + resourceManagerGateway.cancelSlotRequest(allocationId); + } + + // do not accept any requests + jobMasterId = null; + resourceManagerGateway = null; + + // Clear (but not release!) the available slots. The TaskManagers should re-register them + // at the new leader JobManager/SlotPool + clear(); + } + + @Override + public void close() { + log.info("Stopping SlotPool."); + // cancel all pending allocations + Set<AllocationID> allocationIds = pendingRequests.keySetB(); + + for (AllocationID allocationId : allocationIds) { + resourceManagerGateway.cancelSlotRequest(allocationId); + } + + // release all registered slots by releasing the corresponding TaskExecutors + for (ResourceID taskManagerResourceId : registeredTaskManagers) { + final FlinkException cause = new FlinkException( + "Releasing TaskManager " + taskManagerResourceId + ", because of stopping of SlotPool"); + releaseTaskManagerInternal(taskManagerResourceId, cause); + } + + clear(); + } + + // ------------------------------------------------------------------------ + // Resource Manager Connection + // ------------------------------------------------------------------------ + + @Override + public void connectToResourceManager(@Nonnull ResourceManagerGateway resourceManagerGateway) { + this.resourceManagerGateway = checkNotNull(resourceManagerGateway); + + // work on all slots waiting for this connection + for (PendingRequest pendingRequest : waitingForResourceManager.values()) { + requestSlotFromResourceManager(resourceManagerGateway, pendingRequest); + } + + // all sent off + waitingForResourceManager.clear(); + } + + @Override + public void disconnectResourceManager() { + this.resourceManagerGateway = null; + } + + // ------------------------------------------------------------------------ + // Slot Allocation + // ------------------------------------------------------------------------ + + /** + * Requests a new slot with the given {@link ResourceProfile} from the ResourceManager. If there is + * currently not ResourceManager connected, then the request is stashed and send once a new + * ResourceManager is connected. + * + * @param slotRequestId identifying the requested slot + * @param resourceProfile which the requested slot should fulfill + * @param timeout timeout before the slot allocation times out + * @return An {@link AllocatedSlot} future which is completed once the slot is offered to the {@link SlotPool} + */ + @Nonnull + private CompletableFuture<AllocatedSlot> requestNewAllocatedSlotInternal( + @Nonnull SlotRequestId slotRequestId, + @Nonnull ResourceProfile resourceProfile, + @Nonnull Time timeout) { + + jmMainThreadScheduledExecutor.assertRunningInMainThread(); + + CompletableFuture<AllocatedSlot> timeoutFuture = new CompletableFuture<>(); + // register request timeout + FutureUtils.orTimeout(timeoutFuture, timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + CompletableFuture<AllocatedSlot> pendingRequestFuture = timeoutFuture.handleAsync( + (AllocatedSlot as, Throwable t) -> { + if (t != null) { + throw new CompletionException(t); + } + return as; + }, jmMainThreadScheduledExecutor); + + final PendingRequest pendingRequest = new PendingRequest( + slotRequestId, + resourceProfile, + pendingRequestFuture); + + pendingRequestFuture.whenComplete( + (AllocatedSlot ignored, Throwable throwable) -> { + if (ExceptionUtils.findThrowable(throwable, TimeoutException.class).isPresent()) { + timeoutPendingSlotRequest(slotRequestId); + } + }); + + if (resourceManagerGateway == null) { + stashRequestWaitingForResourceManager(pendingRequest); + } else { + requestSlotFromResourceManager(resourceManagerGateway, pendingRequest); + } + + return pendingRequest.getAllocatedSlotFuture(); + } + + private void requestSlotFromResourceManager( + final ResourceManagerGateway resourceManagerGateway, + final PendingRequest pendingRequest) { + + checkNotNull(resourceManagerGateway); + checkNotNull(pendingRequest); + + log.info("Requesting new slot [{}] and profile {} from resource manager.", pendingRequest.getSlotRequestId(), pendingRequest.getResourceProfile()); + + final AllocationID allocationId = new AllocationID(); + + pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId, pendingRequest); + + pendingRequest.getAllocatedSlotFuture().whenComplete( + (AllocatedSlot allocatedSlot, Throwable throwable) -> { + if (throwable != null || !allocationId.equals(allocatedSlot.getAllocationId())) { + // cancel the slot request if there is a failure or if the pending request has + // been completed with another allocated slot + resourceManagerGateway.cancelSlotRequest(allocationId); + } + }); + + CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot( + jobMasterId, + new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress), + rpcTimeout); + + // on failure, fail the request future + rmResponse.whenCompleteAsync( + (Acknowledge ignored, Throwable failure) -> { + if (failure != null) { + slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), failure); + } + }, + jmMainThreadScheduledExecutor); + } + + private void slotRequestToResourceManagerFailed(SlotRequestId slotRequestID, Throwable failure) { + PendingRequest request = pendingRequests.removeKeyA(slotRequestID); + if (request != null) { + request.getAllocatedSlotFuture().completeExceptionally(new NoResourceAvailableException( + "No pooled slot available and request to ResourceManager for new slot failed", failure)); + } else { + if (log.isDebugEnabled()) { + log.debug("Unregistered slot request [{}] failed.", slotRequestID, failure); + } + } + } + + private void stashRequestWaitingForResourceManager(final PendingRequest pendingRequest) { + + log.info("Cannot serve slot request, no ResourceManager connected. " + + "Adding as pending request [{}]", pendingRequest.getSlotRequestId()); + + waitingForResourceManager.put(pendingRequest.getSlotRequestId(), pendingRequest); + } + + // ------------------------------------------------------------------------ + // Slot releasing & offering + // ------------------------------------------------------------------------ + + @Override + public void releaseSlot(@Nonnull SlotRequestId slotRequestId, @Nullable Throwable cause) { + + jmMainThreadScheduledExecutor.assertRunningInMainThread(); + + log.debug("Releasing slot [{}] because: {}", slotRequestId, cause != null ? cause.getMessage() : "null"); + releaseSingleSlot(slotRequestId, cause); + } + + private void releaseSingleSlot(SlotRequestId slotRequestId, Throwable cause) { + final PendingRequest pendingRequest = removePendingRequest(slotRequestId); + + if (pendingRequest != null) { + failPendingRequest(pendingRequest, new FlinkException("Pending slot request with " + slotRequestId + " has been released.")); + } else { + final AllocatedSlot allocatedSlot = allocatedSlots.remove(slotRequestId); + + if (allocatedSlot != null) { + allocatedSlot.releasePayload(cause); + tryFulfillSlotRequestOrMakeAvailable(allocatedSlot); + } else { + log.debug("There is no allocated slot [{}]. Ignoring the release slot request.", slotRequestId); + } + } + } + + /** + * Checks whether there exists a pending request with the given slot request id and removes it + * from the internal data structures. + * + * @param requestId identifying the pending request + * @return pending request if there is one, otherwise null + */ + @Nullable + private PendingRequest removePendingRequest(SlotRequestId requestId) { + PendingRequest result = waitingForResourceManager.remove(requestId); + + if (result != null) { + // sanity check + assert !pendingRequests.containsKeyA(requestId) : "A pending requests should only be part of either " + + "the pendingRequests or waitingForResourceManager but not both."; + + return result; + } else { + return pendingRequests.removeKeyA(requestId); + } + } + + private void failPendingRequest(PendingRequest pendingRequest, Exception e) { + checkNotNull(pendingRequest); + checkNotNull(e); + + if (!pendingRequest.getAllocatedSlotFuture().isDone()) { + log.info("Failing pending slot request [{}]: {}", pendingRequest.getSlotRequestId(), e.getMessage()); + pendingRequest.getAllocatedSlotFuture().completeExceptionally(e); + } + } + + @Override + public Optional<AllocatedSlotContext> allocateAvailableSlot( + @Nonnull SlotRequestId slotRequestId, + @Nonnull AllocationID allocationID) { + + jmMainThreadScheduledExecutor.assertRunningInMainThread(); + + AllocatedSlot allocatedSlot = availableSlots.tryRemove(allocationID); + if (allocatedSlot != null) { + allocatedSlots.add(slotRequestId, allocatedSlot); + return Optional.of(allocatedSlot); + } else { + return Optional.empty(); + } + } + + @Nonnull + @Override + public CompletableFuture<AllocatedSlotContext> requestNewAllocatedSlot( + @Nonnull SlotRequestId slotRequestId, + @Nonnull ResourceProfile resourceProfile, + Time timeout) { + + return requestNewAllocatedSlotInternal(slotRequestId, resourceProfile, timeout) + .thenApply((Function.identity())); + } + + @Override + @Nonnull + public Collection<SlotInfo> getAvailableSlotsInformation() { + return availableSlots.listSlotInfo(); + } + + /** + * Tries to fulfill with the given allocated slot a pending slot request or add the + * allocated slot to the set of available slots if no matching request is available. + * + * @param allocatedSlot which shall be returned + */ + private void tryFulfillSlotRequestOrMakeAvailable(AllocatedSlot allocatedSlot) { + Preconditions.checkState(!allocatedSlot.isUsed(), "Provided slot is still in use."); + + final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot); + + if (pendingRequest != null) { + log.debug("Fulfilling pending slot request [{}] early with returned slot [{}]", + pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId()); + + allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot); + pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot); + } else { + log.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId()); + availableSlots.add(allocatedSlot, clock.relativeTimeMillis()); + } + } + + private PendingRequest pollMatchingPendingRequest(final AllocatedSlot slot) { + final ResourceProfile slotResources = slot.getResourceProfile(); + + // try the requests sent to the resource manager first + for (PendingRequest request : pendingRequests.values()) { + if (slotResources.isMatching(request.getResourceProfile())) { + pendingRequests.removeKeyA(request.getSlotRequestId()); + return request; + } + } + + // try the requests waiting for a resource manager connection next + for (PendingRequest request : waitingForResourceManager.values()) { + if (slotResources.isMatching(request.getResourceProfile())) { + waitingForResourceManager.remove(request.getSlotRequestId()); + return request; + } + } + + // no request pending, or no request matches + return null; + } + + @Override + public Collection<SlotOffer> offerSlots( + TaskManagerLocation taskManagerLocation, + TaskManagerGateway taskManagerGateway, + Collection<SlotOffer> offers) { + + ArrayList<SlotOffer> result = new ArrayList<>(offers.size()); + + for (SlotOffer offer : offers) { + if (offerSlot( + taskManagerLocation, + taskManagerGateway, + offer)) { + + result.add(offer); + } + } + + return result; + } + + /** + * Slot offering by TaskExecutor with AllocationID. The AllocationID is originally generated by this pool and + * transfer through the ResourceManager to TaskManager. We use it to distinguish the different allocation + * we issued. Slot offering may be rejected if we find something mismatching or there is actually no pending + * request waiting for this slot (maybe fulfilled by some other returned slot). + * + * @param taskManagerLocation location from where the offer comes from + * @param taskManagerGateway TaskManager gateway + * @param slotOffer the offered slot + * @return True if we accept the offering + */ + @Override + public boolean offerSlot( + final TaskManagerLocation taskManagerLocation, + final TaskManagerGateway taskManagerGateway, + final SlotOffer slotOffer) { + + jmMainThreadScheduledExecutor.assertRunningInMainThread(); + + // check if this TaskManager is valid + final ResourceID resourceID = taskManagerLocation.getResourceID(); + final AllocationID allocationID = slotOffer.getAllocationId(); + + if (!registeredTaskManagers.contains(resourceID)) { + log.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}", + slotOffer.getAllocationId(), taskManagerLocation); + return false; + } + + // check whether we have already using this slot + AllocatedSlot existingSlot; + if ((existingSlot = allocatedSlots.get(allocationID)) != null || + (existingSlot = availableSlots.get(allocationID)) != null) { + + // we need to figure out if this is a repeated offer for the exact same slot, + // or another offer that comes from a different TaskManager after the ResourceManager + // re-tried the request + + // we write this in terms of comparing slot IDs, because the Slot IDs are the identifiers of + // the actual slots on the TaskManagers + // Note: The slotOffer should have the SlotID + final SlotID existingSlotId = existingSlot.getSlotId(); + final SlotID newSlotId = new SlotID(taskManagerLocation.getResourceID(), slotOffer.getSlotIndex()); + + if (existingSlotId.equals(newSlotId)) { + log.info("Received repeated offer for slot [{}]. Ignoring.", allocationID); + + // return true here so that the sender will get a positive acknowledgement to the retry + // and mark the offering as a success + return true; + } else { + // the allocation has been fulfilled by another slot, reject the offer so the task executor + // will offer the slot to the resource manager + return false; + } + } + + final AllocatedSlot allocatedSlot = new AllocatedSlot( + allocationID, + taskManagerLocation, + slotOffer.getSlotIndex(), + slotOffer.getResourceProfile(), + taskManagerGateway); + + // check whether we have request waiting for this slot + PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID); + if (pendingRequest != null) { + // we were waiting for this! + allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot); + + if (!pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot)) { + // we could not complete the pending slot future --> try to fulfill another pending request + allocatedSlots.remove(pendingRequest.getSlotRequestId()); + tryFulfillSlotRequestOrMakeAvailable(allocatedSlot); + } else { + log.debug("Fulfilled slot request [{}] with allocated slot [{}].", pendingRequest.getSlotRequestId(), allocationID); + } + } + else { + // we were actually not waiting for this: + // - could be that this request had been fulfilled + // - we are receiving the slots from TaskManagers after becoming leaders + tryFulfillSlotRequestOrMakeAvailable(allocatedSlot); + } + + // we accepted the request in any case. slot will be released after it idled for + // too long and timed out + return true; + } + + + // TODO - periodic (every minute or so) catch slots that were lost (check all slots, if they have any task active) + + // TODO - release slots that were not used to the resource manager + + // ------------------------------------------------------------------------ + // Error Handling + // ------------------------------------------------------------------------ + + /** + * Fail the specified allocation and release the corresponding slot if we have one. + * This may triggered by JobManager when some slot allocation failed with rpcTimeout. + * Or this could be triggered by TaskManager, when it finds out something went wrong with the slot, + * and decided to take it back. + * + * @param allocationID Represents the allocation which should be failed + * @param cause The cause of the failure + * @return Optional task executor if it has no more slots registered + */ + @Override + public Optional<ResourceID> failAllocation(final AllocationID allocationID, final Exception cause) { + + jmMainThreadScheduledExecutor.assertRunningInMainThread(); + + final PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID); + if (pendingRequest != null) { + // request was still pending + failPendingRequest(pendingRequest, cause); + return Optional.empty(); + } + else { + return tryFailingAllocatedSlot(allocationID, cause); + } + + // TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase + } + + private Optional<ResourceID> tryFailingAllocatedSlot(AllocationID allocationID, Exception cause) { + AllocatedSlot allocatedSlot = availableSlots.tryRemove(allocationID); + + if (allocatedSlot == null) { + allocatedSlot = allocatedSlots.remove(allocationID); + } + + if (allocatedSlot != null) { + log.debug("Failed allocated slot [{}]: {}", allocationID, cause.getMessage()); + + // notify TaskExecutor about the failure + allocatedSlot.getTaskManagerGateway().freeSlot(allocationID, cause, rpcTimeout); + // release the slot. + // since it is not in 'allocatedSlots' any more, it will be dropped o return' + allocatedSlot.releasePayload(cause); + + final ResourceID taskManagerId = allocatedSlot.getTaskManagerId(); + + if (!availableSlots.containsTaskManager(taskManagerId) && !allocatedSlots.containResource(taskManagerId)) { + return Optional.of(taskManagerId); + } + } + + return Optional.empty(); + } + + // ------------------------------------------------------------------------ + // Resource + // ------------------------------------------------------------------------ + + /** + * Register TaskManager to this pool, only those slots come from registered TaskManager will be considered valid. + * Also it provides a way for us to keep "dead" or "abnormal" TaskManagers out of this pool. + * + * @param resourceID The id of the TaskManager + */ + @Override + public boolean registerTaskManager(final ResourceID resourceID) { + + jmMainThreadScheduledExecutor.assertRunningInMainThread(); + + log.debug("Register new TaskExecutor {}.", resourceID); + return registeredTaskManagers.add(resourceID); + } + + /** + * Unregister TaskManager from this pool, all the related slots will be released and tasks be canceled. Called + * when we find some TaskManager becomes "dead" or "abnormal", and we decide to not using slots from it anymore. + * + * @param resourceId The id of the TaskManager + * @param cause for the releasing of the TaskManager + */ + @Override + public boolean releaseTaskManager(final ResourceID resourceId, final Exception cause) { + + jmMainThreadScheduledExecutor.assertRunningInMainThread(); + + if (registeredTaskManagers.remove(resourceId)) { + releaseTaskManagerInternal(resourceId, cause); + return true; + } else { + return false; + } + } + + // ------------------------------------------------------------------------ + // Internal methods + // ------------------------------------------------------------------------ + + @VisibleForTesting + protected void timeoutPendingSlotRequest(SlotRequestId slotRequestId) { + log.info("Pending slot request [{}] timed out.", slotRequestId); + removePendingRequest(slotRequestId); + } + + private void releaseTaskManagerInternal(final ResourceID resourceId, final Exception cause) { + final Set<AllocatedSlot> removedSlots = new HashSet<>(allocatedSlots.removeSlotsForTaskManager(resourceId)); + + for (AllocatedSlot allocatedSlot : removedSlots) { + allocatedSlot.releasePayload(cause); + } + + removedSlots.addAll(availableSlots.removeAllForTaskManager(resourceId)); + + for (AllocatedSlot removedSlot : removedSlots) { + TaskManagerGateway taskManagerGateway = removedSlot.getTaskManagerGateway(); + taskManagerGateway.freeSlot(removedSlot.getAllocationId(), cause, rpcTimeout); + } + } + + /** + * Check the available slots, release the slot that is idle for a long time. + */ + private void checkIdleSlot() { + + // The timestamp in SlotAndTimestamp is relative + final long currentRelativeTimeMillis = clock.relativeTimeMillis(); + + final List<AllocatedSlot> expiredSlots = new ArrayList<>(availableSlots.size()); + + for (SlotAndTimestamp slotAndTimestamp : availableSlots.availableSlots.values()) { + if (currentRelativeTimeMillis - slotAndTimestamp.timestamp > idleSlotTimeout.toMilliseconds()) { + expiredSlots.add(slotAndTimestamp.slot); + } + } + + final FlinkException cause = new FlinkException("Releasing idle slot."); + + for (AllocatedSlot expiredSlot : expiredSlots) { + final AllocationID allocationID = expiredSlot.getAllocationId(); + if (availableSlots.tryRemove(allocationID) != null) { + + log.info("Releasing idle slot [{}].", allocationID); + final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot( + allocationID, + cause, + rpcTimeout); + + freeSlotFuture.whenCompleteAsync( + (Acknowledge ignored, Throwable throwable) -> { + if (throwable != null) { + if (registeredTaskManagers.contains(expiredSlot.getTaskManagerId())) { + log.debug("Releasing slot [{}] of registered TaskExecutor {} failed. " + + "Trying to fulfill a different slot request.", allocationID, expiredSlot.getTaskManagerId(), + throwable); + tryFulfillSlotRequestOrMakeAvailable(expiredSlot); + } else { + log.debug("Releasing slot [{}] failed and owning TaskExecutor {} is no " + + "longer registered. Discarding slot.", allocationID, expiredSlot.getTaskManagerId()); + } + } + }, + jmMainThreadScheduledExecutor); + } + } + + scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout); + } + + /** + * Clear the internal state of the SlotPool. + */ + private void clear() { + availableSlots.clear(); + allocatedSlots.clear(); + pendingRequests.clear(); + waitingForResourceManager.clear(); + registeredTaskManagers.clear(); + } + + // ------------------------------------------------------------------------ + // Methods for tests + // ------------------------------------------------------------------------ + + private void scheduledLogStatus() { + log.debug(printStatus()); + scheduleRunAsync(this::scheduledLogStatus, STATUS_LOG_INTERVAL_MS, TimeUnit.MILLISECONDS); + } + + private String printStatus() { + + final StringBuilder builder = new StringBuilder(1024).append("Slot Pool Status:\n"); + + builder.append("\tstatus: "); + if (resourceManagerGateway != null) { + builder.append("connected to ").append(resourceManagerGateway.getAddress()).append('\n'); + } else { + builder.append("unconnected and waiting for ResourceManager ") + .append(waitingForResourceManager) + .append('\n'); + } + + builder.append("\tregistered TaskManagers: ").append(registeredTaskManagers).append('\n'); + + builder.append("\tavailable slots: ").append(availableSlots.printAllSlots()).append('\n'); + builder.append("\tallocated slots: ").append(allocatedSlots.printAllSlots()).append('\n'); + + builder.append("\tpending requests: ").append(pendingRequests.values()).append('\n'); + + builder.append("\t}\n"); + return builder.toString(); + } + + @VisibleForTesting + protected AllocatedSlots getAllocatedSlots() { + return allocatedSlots; + } + + @VisibleForTesting + protected AvailableSlots getAvailableSlots() { + return availableSlots; + } + + @VisibleForTesting + DualKeyMap<SlotRequestId, AllocationID, PendingRequest> getPendingRequests() { + return pendingRequests; + } + + @VisibleForTesting + Map<SlotRequestId, PendingRequest> getWaitingForResourceManager() { + return waitingForResourceManager; + } + + @VisibleForTesting + void triggerCheckIdleSlot() { + runAsync(this::checkIdleSlot); + } + + /** + * Execute the runnable in the main thread of the underlying RPC endpoint. Review comment: But isn't this correct? The underlying RPC endpoint is the job master. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services