KarmaGYZ commented on a change in pull request #14647:
URL: https://github.com/apache/flink/pull/14647#discussion_r567581423



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
##########
@@ -0,0 +1,790 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.slots.ResourceCounter;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.slots.ResourceRequirements;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Implementation of {@link SlotManager} supporting fine-grained resource 
management. */
+public class FineGrainedSlotManager implements SlotManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FineGrainedSlotManager.class);
+
+    private final TaskManagerTracker taskManagerTracker;
+    private final ResourceTracker resourceTracker;
+    private final ResourceAllocationStrategy resourceAllocationStrategy;
+
+    private final SlotStatusSyncer slotStatusSyncer;
+
+    /** Scheduled executor for timeouts. */
+    private final ScheduledExecutor scheduledExecutor;
+
+    /** Timeout after which an unused TaskManager is released. */
+    private final Time taskManagerTimeout;
+
+    private final SlotManagerMetricGroup slotManagerMetricGroup;
+
+    private final Map<JobID, String> jobMasterTargetAddresses = new 
HashMap<>();
+
+    /** Defines the max limitation of the total number of task executors. */
+    private final int maxTaskManagerNum;
+
+    /** Defines the number of redundant task executors. */
+    private final int redundantTaskManagerNum;
+
+    /**
+     * Release task executor only when each produced result partition is 
either consumed or failed.
+     */
+    private final boolean waitResultConsumedBeforeRelease;
+
+    /** The default resource spec of workers to request. */
+    private final WorkerResourceSpec defaultWorkerResourceSpec;
+
+    /** The resource profile of default slot. */
+    private final ResourceProfile defaultSlotResourceProfile;
+
+    private boolean sendNotEnoughResourceNotifications = true;
+
+    /** ResourceManager's id. */
+    @Nullable private ResourceManagerId resourceManagerId;
+
+    /** Executor for future callbacks which have to be "synchronized". */
+    @Nullable private Executor mainThreadExecutor;
+
+    /** Callbacks for resource (de-)allocations. */
+    @Nullable private ResourceActions resourceActions;
+
+    private ScheduledFuture<?> taskManagerTimeoutsAndRedundancyCheck;
+
+    /** True iff the component has been started. */
+    private boolean started;
+
+    public FineGrainedSlotManager(
+            ScheduledExecutor scheduledExecutor,
+            SlotManagerConfiguration slotManagerConfiguration,
+            SlotManagerMetricGroup slotManagerMetricGroup,
+            ResourceTracker resourceTracker,
+            TaskManagerTracker taskManagerTracker) {
+
+        this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
+
+        Preconditions.checkNotNull(slotManagerConfiguration);
+        this.taskManagerTimeout = 
slotManagerConfiguration.getTaskManagerTimeout();
+        this.redundantTaskManagerNum = 
slotManagerConfiguration.getRedundantTaskManagerNum();
+        this.waitResultConsumedBeforeRelease =
+                slotManagerConfiguration.isWaitResultConsumedBeforeRelease();
+        this.defaultWorkerResourceSpec = 
slotManagerConfiguration.getDefaultWorkerResourceSpec();
+        this.resourceAllocationStrategy = 
slotManagerConfiguration.getResourceAllocationStrategy();
+        int numSlotsPerWorker = 
slotManagerConfiguration.getNumSlotsPerWorker();
+        this.maxTaskManagerNum = slotManagerConfiguration.getMaxSlotNum() / 
numSlotsPerWorker;
+
+        this.slotManagerMetricGroup = 
Preconditions.checkNotNull(slotManagerMetricGroup);
+        this.defaultSlotResourceProfile =
+                SlotManagerUtils.generateDefaultSlotResourceProfile(
+                        defaultWorkerResourceSpec, numSlotsPerWorker);
+
+        this.resourceTracker = Preconditions.checkNotNull(resourceTracker);
+        this.taskManagerTracker = 
Preconditions.checkNotNull(taskManagerTracker);
+        this.slotStatusSyncer =
+                new DefaultSlotStatusSyncer(
+                        taskManagerTracker,
+                        resourceTracker,
+                        
slotManagerConfiguration.getTaskManagerRequestTimeout());
+
+        resourceManagerId = null;
+        resourceActions = null;
+        mainThreadExecutor = null;
+        taskManagerTimeoutsAndRedundancyCheck = null;
+
+        started = false;
+    }
+
+    @Override
+    public void setFailUnfulfillableRequest(boolean failUnfulfillableRequest) {
+        // this sets up a grace period, e.g., when the cluster was started, to 
give task executors
+        // time to connect
+        sendNotEnoughResourceNotifications = failUnfulfillableRequest;
+
+        if (failUnfulfillableRequest) {
+            checkResourceRequirements();
+        }
+    }
+
+    // 
---------------------------------------------------------------------------------------------
+    // Component lifecycle methods
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Starts the slot manager with the given leader id and resource manager 
actions.
+     *
+     * @param newResourceManagerId to use for communication with the task 
managers
+     * @param newMainThreadExecutor to use to run code in the 
ResourceManager's main thread
+     * @param newResourceActions to use for resource (de-)allocations
+     */
+    @Override
+    public void start(
+            ResourceManagerId newResourceManagerId,
+            Executor newMainThreadExecutor,
+            ResourceActions newResourceActions) {
+        LOG.info("Starting the slot manager.");
+
+        resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
+        mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
+        resourceActions = Preconditions.checkNotNull(newResourceActions);
+
+        started = true;
+
+        taskManagerTimeoutsAndRedundancyCheck =
+                scheduledExecutor.scheduleWithFixedDelay(
+                        () ->
+                                mainThreadExecutor.execute(
+                                        
this::checkTaskManagerTimeoutsAndRedundancy),
+                        0L,
+                        taskManagerTimeout.toMilliseconds(),
+                        TimeUnit.MILLISECONDS);
+
+        registerSlotManagerMetrics();
+    }
+
+    private void registerSlotManagerMetrics() {
+        slotManagerMetricGroup.gauge(
+                MetricNames.TASK_SLOTS_AVAILABLE, () -> (long) 
getNumberFreeSlots());
+        slotManagerMetricGroup.gauge(
+                MetricNames.TASK_SLOTS_TOTAL, () -> (long) 
getNumberRegisteredSlots());
+    }
+
+    /** Suspends the component. This clears the internal state of the slot 
manager. */
+    @Override
+    public void suspend() {
+        if (!started) {
+            return;
+        }
+
+        LOG.info("Suspending the slot manager.");
+
+        resourceTracker.clear();
+
+        // stop the timeout checks for the TaskManagers
+        if (taskManagerTimeoutsAndRedundancyCheck != null) {
+            taskManagerTimeoutsAndRedundancyCheck.cancel(false);
+            taskManagerTimeoutsAndRedundancyCheck = null;
+        }
+
+        for (TaskManagerInfo registeredTaskManager :
+                new 
ArrayList<>(taskManagerTracker.getRegisteredTaskManagers())) {
+            unregisterTaskManager(
+                    registeredTaskManager.getInstanceId(),
+                    new SlotManagerException("The slot manager is being 
suspended."));
+        }
+
+        resourceManagerId = null;
+        resourceActions = null;
+        started = false;
+    }
+
+    /**
+     * Closes the slot manager.
+     *
+     * @throws Exception if the close operation fails
+     */
+    @Override
+    public void close() throws Exception {
+        LOG.info("Closing the slot manager.");
+
+        suspend();
+        slotManagerMetricGroup.close();
+    }
+
+    // 
---------------------------------------------------------------------------------------------
+    // Public API
+    // 
---------------------------------------------------------------------------------------------
+
+    @Override
+    public void processResourceRequirements(ResourceRequirements 
resourceRequirements) {
+        checkInit();
+        LOG.debug(
+                "Received resource requirements from job {}: {}",
+                resourceRequirements.getJobId(),
+                resourceRequirements.getResourceRequirements());
+
+        if (resourceRequirements.getResourceRequirements().isEmpty()) {
+            jobMasterTargetAddresses.remove(resourceRequirements.getJobId());
+        } else {
+            jobMasterTargetAddresses.put(
+                    resourceRequirements.getJobId(), 
resourceRequirements.getTargetAddress());
+        }
+        resourceTracker.notifyResourceRequirements(
+                resourceRequirements.getJobId(), 
resourceRequirements.getResourceRequirements());
+        checkResourceRequirements();
+    }
+
+    /**
+     * Registers a new task manager at the slot manager. This will make the 
task managers slots
+     * known and, thus, available for allocation.
+     *
+     * @param taskExecutorConnection for the new task manager
+     * @param initialSlotReport for the new task manager
+     * @param totalResourceProfile of the new task manager
+     * @param defaultSlotResourceProfile of the new task manager
+     * @return True if the task manager has not been registered before and is 
registered
+     *     successfully; otherwise false
+     */
+    @Override
+    public boolean registerTaskManager(
+            final TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile) {
+        checkInit();
+        LOG.debug(
+                "Registering task executor {} under {} at the slot manager.",
+                taskExecutorConnection.getResourceID(),
+                taskExecutorConnection.getInstanceID());
+
+        // we identify task managers by their instance id
+        if (taskManagerTracker
+                
.getRegisteredTaskManager(taskExecutorConnection.getInstanceID())
+                .isPresent()) {
+            LOG.debug(
+                    "Task executor {} was already registered.",
+                    taskExecutorConnection.getResourceID());
+            reportSlotStatus(taskExecutorConnection.getInstanceID(), 
initialSlotReport);
+            return false;
+        } else {
+            if (isMaxSlotNumExceededAfterRegistration(
+                    initialSlotReport, totalResourceProfile, 
defaultSlotResourceProfile)) {
+                LOG.info(
+                        "The total number of slots exceeds the max limitation 
{}, releasing the excess task executor.",
+                        maxTaskManagerNum);
+                resourceActions.releaseResource(
+                        taskExecutorConnection.getInstanceID(),
+                        new FlinkException(
+                                "The total number of slots exceeds the max 
limitation."));
+                return false;
+            }
+            taskManagerTracker.addTaskManager(
+                    taskExecutorConnection, totalResourceProfile, 
defaultSlotResourceProfile);
+            final Optional<PendingTaskManagerId> matchedPendingTaskManager =
+                    findMatchingPendingTaskManager(
+                            totalResourceProfile, defaultSlotResourceProfile);
+            if (!initialSlotReport.withAllocatedSlot() && 
matchedPendingTaskManager.isPresent()) {
+                
taskManagerTracker.removePendingTaskManager(matchedPendingTaskManager.get());
+                allocateSlotsForRegisteredPendingTaskManager(
+                        matchedPendingTaskManager.get(), 
taskExecutorConnection.getInstanceID());
+            } else {
+                slotStatusSyncer.reportSlotStatus(
+                        taskExecutorConnection.getInstanceID(), 
initialSlotReport);
+                checkResourceRequirements();
+            }
+            return true;
+        }
+    }
+
+    private void allocateSlotsForRegisteredPendingTaskManager(
+            PendingTaskManagerId pendingTaskManagerId, InstanceID instanceId) {
+        Map<JobID, Map<InstanceID, ResourceCounter>> allocations = new 
HashMap<>();
+        taskManagerTracker
+                
.getPendingAllocationsOfPendingTaskManager(pendingTaskManagerId)
+                .forEach(
+                        ((jobId, resourceCounter) ->
+                                allocations.put(
+                                        jobId,
+                                        Collections.singletonMap(instanceId, 
resourceCounter))));
+        allocateSlotsAccordingTo(allocations);
+    }
+
+    private boolean isMaxSlotNumExceededAfterRegistration(
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile) {
+        if (!isMaxSlotNumExceededAfterAdding(1)) {
+            return false;
+        }
+
+        if (initialSlotReport.withAllocatedSlot()) {
+            return isMaxSlotNumExceededAfterAdding(1);
+        }
+
+        return isMaxSlotNumExceededAfterAdding(
+                findMatchingPendingTaskManager(totalResourceProfile, 
defaultSlotResourceProfile)
+                                .isPresent()
+                        ? 0
+                        : 1);
+    }
+
+    private boolean isMaxSlotNumExceededAfterAdding(int numNewTaskExecutor) {
+        return taskManagerTracker.getPendingTaskManagers().size()
+                        + taskManagerTracker.getRegisteredTaskManagers().size()
+                        + numNewTaskExecutor
+                > maxTaskManagerNum;
+    }
+
+    private Optional<PendingTaskManagerId> findMatchingPendingTaskManager(
+            ResourceProfile totalResourceProfile, ResourceProfile 
defaultSlotResourceProfile) {
+        return taskManagerTracker.getPendingTaskManagers().stream()
+                .filter(
+                        pendingTaskManager ->
+                                pendingTaskManager
+                                                .getTotalResourceProfile()
+                                                .equals(totalResourceProfile)
+                                        && pendingTaskManager
+                                                
.getDefaultSlotResourceProfile()
+                                                
.equals(defaultSlotResourceProfile))
+                .max(
+                        Comparator.comparingInt(
+                                pendingTaskManager ->
+                                        taskManagerTracker
+                                                
.getPendingAllocationsOfPendingTaskManager(
+                                                        pendingTaskManager
+                                                                
.getPendingTaskManagerId())
+                                                .size()))
+                .map(PendingTaskManager::getPendingTaskManagerId);
+    }
+
+    @Override
+    public boolean unregisterTaskManager(InstanceID instanceId, Exception 
cause) {
+        checkInit();
+
+        LOG.debug("Unregistering task executor {} from the slot manager.", 
instanceId);
+
+        if 
(taskManagerTracker.getRegisteredTaskManager(instanceId).isPresent()) {
+            for (AllocationID allocationId :
+                    new HashSet<>(
+                            taskManagerTracker
+                                    .getRegisteredTaskManager(instanceId)
+                                    .get()
+                                    .getAllocatedSlots()
+                                    .keySet())) {
+                slotStatusSyncer.freeSlot(allocationId);
+            }
+            taskManagerTracker.removeTaskManager(instanceId);
+            checkResourceRequirements();
+
+            return true;
+        } else {
+            LOG.debug(
+                    "There is no task executor registered with instance ID {}. 
Ignoring this message.",
+                    instanceId);
+
+            return false;
+        }
+    }
+
+    /**
+     * Reports the current slot allocations for a task manager identified by 
the given instance id.
+     *
+     * @param instanceId identifying the task manager for which to report the 
slot status
+     * @param slotReport containing the status for all of its slots
+     * @return true if the slot status has been updated successfully, 
otherwise false
+     */
+    @Override
+    public boolean reportSlotStatus(InstanceID instanceId, SlotReport 
slotReport) {
+        checkInit();
+
+        LOG.debug("Received slot report from instance {}: {}.", instanceId, 
slotReport);
+
+        if 
(taskManagerTracker.getRegisteredTaskManager(instanceId).isPresent()) {
+            slotStatusSyncer.reportSlotStatus(instanceId, slotReport);
+            checkResourceRequirements();
+            return true;
+        } else {
+            LOG.debug(
+                    "Received slot report for unknown task manager with 
instance id {}. Ignoring this report.",
+                    instanceId);
+
+            return false;
+        }
+    }
+
+    /**
+     * Free the given slot from the given allocation. If the slot is still 
allocated by the given
+     * allocation id, then the slot will be freed.
+     *
+     * @param slotId identifying the slot to free, will be ignored
+     * @param allocationId with which the slot is presumably allocated
+     */
+    @Override
+    public void freeSlot(SlotID slotId, AllocationID allocationId) {
+        checkInit();
+        LOG.debug("Freeing slot {}.", allocationId);
+
+        if 
(taskManagerTracker.getAllocatedOrPendingSlot(allocationId).isPresent()) {
+            slotStatusSyncer.freeSlot(allocationId);
+            checkResourceRequirements();
+        } else {
+            LOG.debug(
+                    "Trying to free a slot {} which has not been allocated. 
Ignoring this message.",
+                    allocationId);
+        }
+    }
+
+    // 
---------------------------------------------------------------------------------------------
+    // Requirement matching
+    // 
---------------------------------------------------------------------------------------------
+
+    private void checkResourceRequirements() {
+        final Map<JobID, Collection<ResourceRequirement>> missingResources =
+                resourceTracker.getMissingResources();
+        if (missingResources.isEmpty()) {
+            return;
+        }
+
+        final Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> 
availableResources =
+                taskManagerTracker.getRegisteredTaskManagers().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        TaskManagerInfo::getInstanceId,
+                                        taskManager ->
+                                                Tuple2.of(
+                                                        
taskManager.getAvailableResource(),
+                                                        taskManager
+                                                                
.getDefaultSlotResourceProfile())));
+        final ResourceAllocationResult result =
+                resourceAllocationStrategy.tryFulfillRequirements(
+                        missingResources,
+                        availableResources,
+                        new 
ArrayList<>(taskManagerTracker.getPendingTaskManagers()));
+
+        // Allocate slots according to the result
+        
allocateSlotsAccordingTo(result.getRegisteredResourceAllocationResult());
+
+        // Allocate task managers according to the result
+        final Set<PendingTaskManagerId> failAllocations =
+                
allocateTaskManagersAccordingTo(result.getPendingTaskManagersToBeAllocated());
+
+        // Record slot allocation of pending task managers
+        final Map<PendingTaskManagerId, Map<JobID, ResourceCounter>>
+                pendingResourceAllocationResult =
+                        new 
HashMap<>(result.getPendingResourceAllocationResult());
+        pendingResourceAllocationResult.keySet().retainAll(failAllocations);
+        
taskManagerTracker.recordPendingAllocations(pendingResourceAllocationResult);
+
+        // Notify jobs that can not be fulfilled
+        final Set<JobID> unfulfillableJobs = new 
HashSet<>(result.getUnfulfillableJobs());
+        for (PendingTaskManagerId pendingTaskManagerId : failAllocations) {
+            unfulfillableJobs.addAll(
+                    
result.getPendingResourceAllocationResult().get(pendingTaskManagerId).keySet());
+        }
+        for (JobID jobId : unfulfillableJobs) {
+            if (sendNotEnoughResourceNotifications) {
+                LOG.warn("Could not fulfill resource requirements of job {}.", 
jobId);
+                resourceActions.notifyNotEnoughResourcesAvailable(
+                        jobId, resourceTracker.getAcquiredResources(jobId));
+            }
+        }
+    }
+
+    private void allocateSlotsAccordingTo(Map<JobID, Map<InstanceID, 
ResourceCounter>> result) {
+        final List<CompletableFuture<Void>> allocationFutures = new 
ArrayList<>();
+        for (JobID jobID : result.keySet()) {
+            for (InstanceID instanceID : result.get(jobID).keySet()) {
+                for (Map.Entry<ResourceProfile, Integer> resourceToBeAllocated 
:
+                        result.get(jobID)
+                                .get(instanceID)
+                                .getResourceProfilesWithCount()
+                                .entrySet()) {
+                    for (int i = 0; i < resourceToBeAllocated.getValue(); ++i) 
{
+                        allocationFutures.add(
+                                slotStatusSyncer.allocateSlot(
+                                        instanceID,
+                                        jobID,
+                                        jobMasterTargetAddresses.get(jobID),
+                                        resourceToBeAllocated.getKey(),
+                                        resourceManagerId,
+                                        mainThreadExecutor));
+                    }
+                }
+            }
+        }
+        FutureUtils.combineAll(allocationFutures)
+                .whenCompleteAsync(
+                        (s, t) -> {
+                            if (t != null) {
+                                // If there is allocation failure, we need to 
trigger it again.
+                                checkResourceRequirements();
+                            }
+                        },
+                        mainThreadExecutor);
+    }
+
+    /**
+     * Allocate pending task managers, returns the ids of pending task 
managers that can not be
+     * allocated.
+     */
+    private Set<PendingTaskManagerId> allocateTaskManagersAccordingTo(
+            List<PendingTaskManager> pendingTaskManagers) {
+        final Set<PendingTaskManagerId> failedAllocations = new HashSet<>();
+        for (PendingTaskManager pendingTaskManager : pendingTaskManagers) {
+            if (!allocateResource(pendingTaskManager)) {
+                
failedAllocations.add(pendingTaskManager.getPendingTaskManagerId());
+            }
+        }
+        return failedAllocations;
+    }
+
+    // 
---------------------------------------------------------------------------------------------
+    // Legacy APIs
+    // 
---------------------------------------------------------------------------------------------
+
+    @Override
+    public int getNumberRegisteredSlots() {
+        return 
taskManagerTracker.getStatusOverview().getNumberRegisteredSlots();
+    }
+
+    @Override
+    public int getNumberRegisteredSlotsOf(InstanceID instanceId) {
+        return 
taskManagerTracker.getStatusOverview().getNumberRegisteredSlotsOf(instanceId);
+    }
+
+    @Override
+    public int getNumberFreeSlots() {
+        return taskManagerTracker.getStatusOverview().getNumberFreeSlots();
+    }
+
+    @Override
+    public int getNumberFreeSlotsOf(InstanceID instanceId) {
+        return 
taskManagerTracker.getStatusOverview().getNumberFreeSlotsOf(instanceId);
+    }
+
+    @Override
+    public Map<WorkerResourceSpec, Integer> getRequiredResources() {
+        return taskManagerTracker.getPendingTaskManagers().stream()
+                .map(PendingTaskManager::getTotalResourceProfile)
+                .map(WorkerResourceSpec::fromTotalResourceProfile)
+                .collect(Collectors.groupingBy(Function.identity(), 
Collectors.summingInt(e -> 1)));
+    }
+
+    @Override
+    public ResourceProfile getRegisteredResource() {
+        return taskManagerTracker.getStatusOverview().getRegisteredResource();
+    }
+
+    @Override
+    public ResourceProfile getRegisteredResourceOf(InstanceID instanceID) {
+        return 
taskManagerTracker.getStatusOverview().getRegisteredResourceOf(instanceID);
+    }
+
+    @Override
+    public ResourceProfile getFreeResource() {
+        return taskManagerTracker.getStatusOverview().getFreeResource();
+    }
+
+    @Override
+    public ResourceProfile getFreeResourceOf(InstanceID instanceID) {
+        return 
taskManagerTracker.getStatusOverview().getRegisteredResourceOf(instanceID);
+    }
+
+    @Override
+    public int getNumberPendingSlotRequests() {
+        // only exists for testing purposes
+        throw new UnsupportedOperationException();
+    }
+
+    // 
---------------------------------------------------------------------------------------------
+    // Internal periodic check methods
+    // 
---------------------------------------------------------------------------------------------
+
+    private void checkTaskManagerTimeoutsAndRedundancy() {
+        Map<InstanceID, TaskExecutorConnection> timeoutTaskManagers = 
getTimeOutTaskManagers();
+        if (!timeoutTaskManagers.isEmpty()) {
+            int freeTaskManagersNum =
+                    (int)
+                            
taskManagerTracker.getRegisteredTaskManagers().stream()
+                                    .filter(TaskManagerInfo::isIdle)
+                                    .count();
+            int taskManagersDiff = redundantTaskManagerNum - 
freeTaskManagersNum;

Review comment:
       `taskManagerTimeoutsAndRedundancyCheck` -> `taskManagerTimeoutsCheck`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to