xintongsong commented on a change in pull request #11615: URL: https://github.com/apache/flink/pull/11615#discussion_r411062867
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java ########## @@ -64,7 +66,9 @@ public SlotManagerConfiguration( this.slotMatchingStrategy = Preconditions.checkNotNull(slotMatchingStrategy); this.defaultWorkerResourceSpec = Preconditions.checkNotNull(defaultWorkerResourceSpec); Preconditions.checkState(numSlotsPerWorker > 0); + Preconditions.checkState(maxSlotNum > 0); this.numSlotsPerWorker = numSlotsPerWorker; + this.maxSlotNum = maxSlotNum; Review comment: nit: might have better readability ```suggestion this.numSlotsPerWorker = numSlotsPerWorker; this.maxSlotNum = maxSlotNum; Preconditions.checkState(numSlotsPerWorker > 0); Preconditions.checkState(maxSlotNum > 0); ``` ########## File path: flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java ########## @@ -76,6 +77,16 @@ .withDeprecatedKeys("yarn.heap-cutoff-min") .withDescription("Minimum amount of heap memory to remove in Job Master containers, as a safety margin."); + @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING) + public static final ConfigOption<Integer> MAX_SLOT_NUM = ConfigOptions + .key("slotmanager.number-of-slots.max") + .intType() + .defaultValue(Integer.MAX_VALUE) + .withDescription("Defines the maximum number of slots that the Flink cluster allocates. This configuration option " + + "is meant for limiting the memory consumption for batch workloads. It is not recommended to configure this option " + + "for streaming workloads, which may fail if there are not enough slots. Note that this configuration option does not take" + + " effect for standalone clusters, where how many slots are allocated is not controlled by Flink."); Review comment: nit: better to align the code style by keeping the space at the end of previous line. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java ########## @@ -50,14 +50,15 @@ ClusterInformation clusterInformation, @Nullable String webInterfaceUrl, ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception { + Configuration configForStandaloneResourceManager = ConfigurationUtils.getConfigurationForStandaloneResourceManager(configuration); Review comment: ```suggestion final Configuration effectiveConfiguration = ConfigurationUtils.getConfigurationForStandaloneResourceManager(configuration); ``` ########## File path: flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java ########## @@ -109,6 +114,21 @@ public static Time getStandaloneClusterStartupPeriodTime(Configuration configura return timeout; } + /** + * Get the configuration for standalone ResourceManager, overwrite invalid configs. + * + * @param configuration configuration object + * @return the configuration for standalone ResourceManager + */ + public static Configuration getConfigurationForStandaloneResourceManager(Configuration configuration) { Review comment: Not sure about putting this method in `ConfigurationUtils`. I think it's not really reused anywhere other than `StandaloneResourceManagerFactory`. Can it be a static method in `StandaloneResourceManagerFactory`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ########## @@ -658,6 +666,38 @@ private PendingTaskManagerSlot findExactlyMatchingPendingTaskManagerSlot(Resourc return null; } + private boolean totalSlotNumExceedMaxLimitAfterRegister(SlotReport slotReport) { + int numRegisteredSlot = 0; + for (SlotStatus ignored : slotReport) { + numRegisteredSlot += 1; + } + // First check if the total number exceed before matching pending slot. + if (getNumberRegisteredSlots() + getNumberPendingTaskManagerSlots() + numRegisteredSlot > maxSlotNum) { Review comment: IIUC, this `if` is more like a shortcut to avoid calculating number of matching registered/pending slots in most cases. I would suggest to make that more explicit and avoid nested `if` branches in the following way. ``` if (getNumberRegisteredSlots() + getNumberPendingTaskManagerSlots() + numRegisteredSlot <= maxSlotNum) { return false; } ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ########## @@ -658,6 +666,38 @@ private PendingTaskManagerSlot findExactlyMatchingPendingTaskManagerSlot(Resourc return null; } + private boolean totalSlotNumExceedMaxLimitAfterRegister(SlotReport slotReport) { + int numRegisteredSlot = 0; + for (SlotStatus ignored : slotReport) { + numRegisteredSlot += 1; + } Review comment: I would suggest add a method `SlotReport.getNumSlotStatus()`. ########## File path: flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java ########## @@ -109,6 +114,21 @@ public static Time getStandaloneClusterStartupPeriodTime(Configuration configura return timeout; } + /** + * Get the configuration for standalone ResourceManager, overwrite invalid configs. + * + * @param configuration configuration object + * @return the configuration for standalone ResourceManager + */ + public static Configuration getConfigurationForStandaloneResourceManager(Configuration configuration) { + // The max slot limit should not take effect for standalone cluster, we overwrite the configure in case user + // sets this value by mistake. + LOG.warn("The {} should not take effect for standalone cluster, If configured, it will be ignored.", ResourceManagerOptions.MAX_SLOT_NUM.key()); Review comment: I think this warning should be printed only if `MAX_SLOT_NUM` is explicitly configured. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ########## @@ -658,6 +666,38 @@ private PendingTaskManagerSlot findExactlyMatchingPendingTaskManagerSlot(Resourc return null; } + private boolean totalSlotNumExceedMaxLimitAfterRegister(SlotReport slotReport) { + int numRegisteredSlot = 0; + for (SlotStatus ignored : slotReport) { + numRegisteredSlot += 1; + } + // First check if the total number exceed before matching pending slot. + if (getNumberRegisteredSlots() + getNumberPendingTaskManagerSlots() + numRegisteredSlot > maxSlotNum) { Review comment: We could also have local variables to avoid duplicated calls to `getNumberRegisteredSlots()` and `getNumberPendingTaskManagerSlots()`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ########## @@ -658,6 +666,38 @@ private PendingTaskManagerSlot findExactlyMatchingPendingTaskManagerSlot(Resourc return null; } + private boolean totalSlotNumExceedMaxLimitAfterRegister(SlotReport slotReport) { + int numRegisteredSlot = 0; + for (SlotStatus ignored : slotReport) { + numRegisteredSlot += 1; + } + // First check if the total number exceed before matching pending slot. + if (getNumberRegisteredSlots() + getNumberPendingTaskManagerSlots() + numRegisteredSlot > maxSlotNum) { + // Then, check how many exceed slots could be consumed by pending slot. + int numMatchingPendingTaskManagerSlots = getNumMatchingPendingTaskManagerSlots(slotReport); + int totalSlotNum = getNumberRegisteredSlots() + getNumberPendingTaskManagerSlots() + numRegisteredSlot - numMatchingPendingTaskManagerSlots; + if (totalSlotNum > maxSlotNum) { + return true; + } + } + return false; + } + + private int getNumMatchingPendingTaskManagerSlots(SlotReport slotReport) { + Set<TaskManagerSlotId> matchingPendingSlots = new HashSet<>(); + + for (SlotStatus slotStatus : slotReport) { + for (PendingTaskManagerSlot pendingTaskManagerSlot : pendingSlots.values()) { + if (!matchingPendingSlots.contains(pendingTaskManagerSlot.getTaskManagerSlotId()) && + pendingTaskManagerSlot.getResourceProfile().equals(slotStatus.getResourceProfile())) { Review comment: I would suggest to extract a method for this line, to avoid code duplication with `findExactlyMatchingPendingTaskManagerSlot`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ########## @@ -658,6 +666,38 @@ private PendingTaskManagerSlot findExactlyMatchingPendingTaskManagerSlot(Resourc return null; } + private boolean totalSlotNumExceedMaxLimitAfterRegister(SlotReport slotReport) { + int numRegisteredSlot = 0; + for (SlotStatus ignored : slotReport) { + numRegisteredSlot += 1; + } + // First check if the total number exceed before matching pending slot. + if (getNumberRegisteredSlots() + getNumberPendingTaskManagerSlots() + numRegisteredSlot > maxSlotNum) { + // Then, check how many exceed slots could be consumed by pending slot. + int numMatchingPendingTaskManagerSlots = getNumMatchingPendingTaskManagerSlots(slotReport); + int totalSlotNum = getNumberRegisteredSlots() + getNumberPendingTaskManagerSlots() + numRegisteredSlot - numMatchingPendingTaskManagerSlots; + if (totalSlotNum > maxSlotNum) { + return true; + } + } + return false; + } + + private int getNumMatchingPendingTaskManagerSlots(SlotReport slotReport) { + Set<TaskManagerSlotId> matchingPendingSlots = new HashSet<>(); + + for (SlotStatus slotStatus : slotReport) { + for (PendingTaskManagerSlot pendingTaskManagerSlot : pendingSlots.values()) { + if (!matchingPendingSlots.contains(pendingTaskManagerSlot.getTaskManagerSlotId()) && + pendingTaskManagerSlot.getResourceProfile().equals(slotStatus.getResourceProfile())) { + matchingPendingSlots.add(pendingTaskManagerSlot.getTaskManagerSlotId()); + break; Review comment: In general, I would suggest to avoid nested for-loops if possible. It might be ok for this particular case. Then I would suggest to add comments after the `break` for better readability. ```suggestion break; // pendingTaskManagerSlot loop ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java ########## @@ -1540,6 +1555,85 @@ public void testSpreadOutSlotAllocationStrategy() throws Exception { } } + /** + * Test that the slot manager respect the max limitation of the number of slots when allocate new resource. + */ + @Test + public void testMaxSlotLimitAllocateResource() throws Exception { + final int numberSlots = 1; + final int maxSlotNum = 1; + + final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); + final JobID jobId = new JobID(); + + final AtomicInteger resourceRequests = new AtomicInteger(0); + ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceFunction( + ignored -> { + resourceRequests.incrementAndGet(); + return true; + }) + .build(); + + final Configuration configuration = new Configuration(); + configuration.setInteger(ResourceManagerOptions.MAX_SLOT_NUM, maxSlotNum); + configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numberSlots); + + try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions, configuration)) { + + assertTrue("The slot request should be accepted", slotManager.registerSlotRequest(createSlotRequest(jobId))); + assertThat(resourceRequests.get(), is(1)); + + // The second slot request should not try to allocate a new resource because of the max limitation. + assertTrue("The slot request should be accepted", slotManager.registerSlotRequest(createSlotRequest(jobId))); + assertThat(resourceRequests.get(), is(1)); + } + } + + /** + * Test that the slot manager release resource when the number of slots exceed max limit when new TaskExecutor registered. + */ + @Test + public void testMaxSlotLimitRegisterResource() throws Exception { + final int numberSlots = 1; + final int maxSlotNum = 1; + final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); + + final CompletableFuture<InstanceID> releasedResourceFuture = new CompletableFuture<>(); + ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setReleaseResourceConsumer((instanceID, e) -> releasedResourceFuture.complete(instanceID)) + .build(); + + final Configuration configuration = new Configuration(); + configuration.setInteger(ResourceManagerOptions.MAX_SLOT_NUM, maxSlotNum); + configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numberSlots); + + final TaskExecutorGateway taskExecutorGateway1 = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); + final TaskExecutorGateway taskExecutorGateway2 = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); + final ResourceID resourceId1 = ResourceID.generate(); + final ResourceID resourceId2 = ResourceID.generate(); + final TaskExecutorConnection taskManagerConnection1 = new TaskExecutorConnection(resourceId1, taskExecutorGateway1); + final TaskExecutorConnection taskManagerConnection2 = new TaskExecutorConnection(resourceId2, taskExecutorGateway2); + + final SlotID slotId1 = new SlotID(resourceId1, 0); + final SlotID slotId2 = new SlotID(resourceId1, 0); + final SlotStatus slotStatus1 = new SlotStatus(slotId1, ResourceProfile.UNKNOWN); + final SlotStatus slotStatus2 = new SlotStatus(slotId2, ResourceProfile.UNKNOWN); + final SlotReport slotReport1 = new SlotReport(Collections.singletonList(slotStatus1)); + final SlotReport slotReport2 = new SlotReport(Collections.singletonList(slotStatus2)); + + try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions, configuration)) { + slotManager.registerTaskManager(taskManagerConnection1, slotReport1); + slotManager.registerTaskManager(taskManagerConnection2, slotReport2); + + assertThat("The number registered slots does not equal the expected number.", slotManager.getNumberRegisteredSlots(), is(equalTo(1))); Review comment: ```suggestion assertThat("The number registered slots does not equal the expected number.", slotManager.getNumberRegisteredSlots(), is(1)); ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ########## @@ -808,16 +848,29 @@ private void fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequ return Optional.empty(); } - private boolean isFulfillableByRegisteredSlots(ResourceProfile resourceProfile) { + private boolean isFulfillableByRegisteredOrPendingSlots(ResourceProfile resourceProfile) { for (TaskManagerSlot slot : slots.values()) { if (slot.getResourceProfile().isMatching(resourceProfile)) { return true; } } + + for (PendingTaskManagerSlot slot : pendingSlots.values()) { + if (slot.getResourceProfile().isMatching(resourceProfile)) { + return true; + } + } + return false; } private Optional<PendingTaskManagerSlot> allocateResource(ResourceProfile requestedSlotResourceProfile) { + if (getNumberPendingTaskManagerSlots() + getNumberRegisteredSlots() + numSlotsPerWorker > maxSlotNum) { Review comment: Could have local variable for `getNumberPendingTaskManagerSlots() + getNumberRegisteredSlots()` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ########## @@ -658,6 +666,38 @@ private PendingTaskManagerSlot findExactlyMatchingPendingTaskManagerSlot(Resourc return null; } + private boolean totalSlotNumExceedMaxLimitAfterRegister(SlotReport slotReport) { + int numRegisteredSlot = 0; + for (SlotStatus ignored : slotReport) { + numRegisteredSlot += 1; + } + // First check if the total number exceed before matching pending slot. + if (getNumberRegisteredSlots() + getNumberPendingTaskManagerSlots() + numRegisteredSlot > maxSlotNum) { + // Then, check how many exceed slots could be consumed by pending slot. + int numMatchingPendingTaskManagerSlots = getNumMatchingPendingTaskManagerSlots(slotReport); + int totalSlotNum = getNumberRegisteredSlots() + getNumberPendingTaskManagerSlots() + numRegisteredSlot - numMatchingPendingTaskManagerSlots; Review comment: I think it would be better to have `getNumNonPendingRegisteredSlots()` or so rather than `numRegisteredSlot - getNumMatchingPendingTaskManagerSlots()`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ########## @@ -808,16 +848,29 @@ private void fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequ return Optional.empty(); } - private boolean isFulfillableByRegisteredSlots(ResourceProfile resourceProfile) { + private boolean isFulfillableByRegisteredOrPendingSlots(ResourceProfile resourceProfile) { for (TaskManagerSlot slot : slots.values()) { if (slot.getResourceProfile().isMatching(resourceProfile)) { return true; } } + + for (PendingTaskManagerSlot slot : pendingSlots.values()) { + if (slot.getResourceProfile().isMatching(resourceProfile)) { + return true; + } + } + return false; } private Optional<PendingTaskManagerSlot> allocateResource(ResourceProfile requestedSlotResourceProfile) { + if (getNumberPendingTaskManagerSlots() + getNumberRegisteredSlots() + numSlotsPerWorker > maxSlotNum) { + LOG.warn("Could not allocate {} more slots. The number of slots is {}, while the maximum is {}.", Review comment: ```suggestion LOG.warn("Could not allocate {} more slots. The number of registered and pending slots is {}, while the maximum is {}.", ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org