xintongsong commented on a change in pull request #11615:
URL: https://github.com/apache/flink/pull/11615#discussion_r411062867



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
##########
@@ -64,7 +66,9 @@ public SlotManagerConfiguration(
                this.slotMatchingStrategy = 
Preconditions.checkNotNull(slotMatchingStrategy);
                this.defaultWorkerResourceSpec = 
Preconditions.checkNotNull(defaultWorkerResourceSpec);
                Preconditions.checkState(numSlotsPerWorker > 0);
+               Preconditions.checkState(maxSlotNum > 0);
                this.numSlotsPerWorker = numSlotsPerWorker;
+               this.maxSlotNum = maxSlotNum;

Review comment:
       nit: might have better readability
   ```suggestion
                this.numSlotsPerWorker = numSlotsPerWorker;
                this.maxSlotNum = maxSlotNum;
                Preconditions.checkState(numSlotsPerWorker > 0);
                Preconditions.checkState(maxSlotNum > 0);
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
##########
@@ -76,6 +77,16 @@
                .withDeprecatedKeys("yarn.heap-cutoff-min")
                .withDescription("Minimum amount of heap memory to remove in 
Job Master containers, as a safety margin.");
 
+       @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
+       public static final ConfigOption<Integer> MAX_SLOT_NUM = ConfigOptions
+               .key("slotmanager.number-of-slots.max")
+               .intType()
+               .defaultValue(Integer.MAX_VALUE)
+               .withDescription("Defines the maximum number of slots that the 
Flink cluster allocates. This configuration option " +
+                       "is meant for limiting the memory consumption for batch 
workloads. It is not recommended to configure this option " +
+                       "for streaming workloads, which may fail if there are 
not enough slots. Note that this configuration option does not take" +
+                       " effect for standalone clusters, where how many slots 
are allocated is not controlled by Flink.");

Review comment:
       nit: better to align the code style by keeping the space at the end of 
previous line.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
##########
@@ -50,14 +50,15 @@
                        ClusterInformation clusterInformation,
                        @Nullable String webInterfaceUrl,
                        ResourceManagerMetricGroup resourceManagerMetricGroup) 
throws Exception {
+               Configuration configForStandaloneResourceManager = 
ConfigurationUtils.getConfigurationForStandaloneResourceManager(configuration);

Review comment:
       ```suggestion
                final Configuration effectiveConfiguration = 
ConfigurationUtils.getConfigurationForStandaloneResourceManager(configuration);
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
##########
@@ -109,6 +114,21 @@ public static Time 
getStandaloneClusterStartupPeriodTime(Configuration configura
                return timeout;
        }
 
+       /**
+        * Get the configuration for standalone ResourceManager, overwrite 
invalid configs.
+        *
+        * @param configuration configuration object
+        * @return the configuration for standalone ResourceManager
+        */
+       public static Configuration 
getConfigurationForStandaloneResourceManager(Configuration configuration) {

Review comment:
       Not sure about putting this method in `ConfigurationUtils`.
   I think it's not really reused anywhere other than 
`StandaloneResourceManagerFactory`. Can it be a static method in 
`StandaloneResourceManagerFactory`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -658,6 +666,38 @@ private PendingTaskManagerSlot 
findExactlyMatchingPendingTaskManagerSlot(Resourc
                return null;
        }
 
+       private boolean totalSlotNumExceedMaxLimitAfterRegister(SlotReport 
slotReport) {
+               int numRegisteredSlot = 0;
+               for (SlotStatus ignored : slotReport) {
+                       numRegisteredSlot += 1;
+               }
+               // First check if the total number exceed before matching 
pending slot.
+               if (getNumberRegisteredSlots() + 
getNumberPendingTaskManagerSlots() + numRegisteredSlot > maxSlotNum) {

Review comment:
       IIUC, this `if` is more like a shortcut to avoid calculating number of 
matching registered/pending slots in most cases. I would suggest to make that 
more explicit and avoid nested `if` branches in the following way.
   ```
   if (getNumberRegisteredSlots() + getNumberPendingTaskManagerSlots() + 
numRegisteredSlot <= maxSlotNum) {
       return false;
   }
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -658,6 +666,38 @@ private PendingTaskManagerSlot 
findExactlyMatchingPendingTaskManagerSlot(Resourc
                return null;
        }
 
+       private boolean totalSlotNumExceedMaxLimitAfterRegister(SlotReport 
slotReport) {
+               int numRegisteredSlot = 0;
+               for (SlotStatus ignored : slotReport) {
+                       numRegisteredSlot += 1;
+               }

Review comment:
       I would suggest add a method `SlotReport.getNumSlotStatus()`.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
##########
@@ -109,6 +114,21 @@ public static Time 
getStandaloneClusterStartupPeriodTime(Configuration configura
                return timeout;
        }
 
+       /**
+        * Get the configuration for standalone ResourceManager, overwrite 
invalid configs.
+        *
+        * @param configuration configuration object
+        * @return the configuration for standalone ResourceManager
+        */
+       public static Configuration 
getConfigurationForStandaloneResourceManager(Configuration configuration) {
+               // The max slot limit should not take effect for standalone 
cluster, we overwrite the configure in case user
+               // sets this value by mistake.
+               LOG.warn("The {} should not take effect for standalone cluster, 
If configured, it will be ignored.", ResourceManagerOptions.MAX_SLOT_NUM.key());

Review comment:
       I think this warning should be printed only if `MAX_SLOT_NUM` is 
explicitly configured.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -658,6 +666,38 @@ private PendingTaskManagerSlot 
findExactlyMatchingPendingTaskManagerSlot(Resourc
                return null;
        }
 
+       private boolean totalSlotNumExceedMaxLimitAfterRegister(SlotReport 
slotReport) {
+               int numRegisteredSlot = 0;
+               for (SlotStatus ignored : slotReport) {
+                       numRegisteredSlot += 1;
+               }
+               // First check if the total number exceed before matching 
pending slot.
+               if (getNumberRegisteredSlots() + 
getNumberPendingTaskManagerSlots() + numRegisteredSlot > maxSlotNum) {

Review comment:
       We could also have local variables to avoid duplicated calls to 
`getNumberRegisteredSlots()` and `getNumberPendingTaskManagerSlots()`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -658,6 +666,38 @@ private PendingTaskManagerSlot 
findExactlyMatchingPendingTaskManagerSlot(Resourc
                return null;
        }
 
+       private boolean totalSlotNumExceedMaxLimitAfterRegister(SlotReport 
slotReport) {
+               int numRegisteredSlot = 0;
+               for (SlotStatus ignored : slotReport) {
+                       numRegisteredSlot += 1;
+               }
+               // First check if the total number exceed before matching 
pending slot.
+               if (getNumberRegisteredSlots() + 
getNumberPendingTaskManagerSlots() + numRegisteredSlot > maxSlotNum) {
+                       // Then, check how many exceed slots could be consumed 
by pending slot.
+                       int numMatchingPendingTaskManagerSlots = 
getNumMatchingPendingTaskManagerSlots(slotReport);
+                       int totalSlotNum = getNumberRegisteredSlots() + 
getNumberPendingTaskManagerSlots() + numRegisteredSlot - 
numMatchingPendingTaskManagerSlots;
+                       if (totalSlotNum > maxSlotNum) {
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+       private int getNumMatchingPendingTaskManagerSlots(SlotReport 
slotReport) {
+               Set<TaskManagerSlotId> matchingPendingSlots = new HashSet<>();
+
+               for (SlotStatus slotStatus : slotReport) {
+                       for (PendingTaskManagerSlot pendingTaskManagerSlot : 
pendingSlots.values()) {
+                               if 
(!matchingPendingSlots.contains(pendingTaskManagerSlot.getTaskManagerSlotId()) 
&&
+                                       
pendingTaskManagerSlot.getResourceProfile().equals(slotStatus.getResourceProfile()))
 {

Review comment:
       I would suggest to extract a method for this line, to avoid code 
duplication with `findExactlyMatchingPendingTaskManagerSlot`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -658,6 +666,38 @@ private PendingTaskManagerSlot 
findExactlyMatchingPendingTaskManagerSlot(Resourc
                return null;
        }
 
+       private boolean totalSlotNumExceedMaxLimitAfterRegister(SlotReport 
slotReport) {
+               int numRegisteredSlot = 0;
+               for (SlotStatus ignored : slotReport) {
+                       numRegisteredSlot += 1;
+               }
+               // First check if the total number exceed before matching 
pending slot.
+               if (getNumberRegisteredSlots() + 
getNumberPendingTaskManagerSlots() + numRegisteredSlot > maxSlotNum) {
+                       // Then, check how many exceed slots could be consumed 
by pending slot.
+                       int numMatchingPendingTaskManagerSlots = 
getNumMatchingPendingTaskManagerSlots(slotReport);
+                       int totalSlotNum = getNumberRegisteredSlots() + 
getNumberPendingTaskManagerSlots() + numRegisteredSlot - 
numMatchingPendingTaskManagerSlots;
+                       if (totalSlotNum > maxSlotNum) {
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+       private int getNumMatchingPendingTaskManagerSlots(SlotReport 
slotReport) {
+               Set<TaskManagerSlotId> matchingPendingSlots = new HashSet<>();
+
+               for (SlotStatus slotStatus : slotReport) {
+                       for (PendingTaskManagerSlot pendingTaskManagerSlot : 
pendingSlots.values()) {
+                               if 
(!matchingPendingSlots.contains(pendingTaskManagerSlot.getTaskManagerSlotId()) 
&&
+                                       
pendingTaskManagerSlot.getResourceProfile().equals(slotStatus.getResourceProfile()))
 {
+                                       
matchingPendingSlots.add(pendingTaskManagerSlot.getTaskManagerSlotId());
+                                       break;

Review comment:
       In general, I would suggest to avoid nested for-loops if possible. It 
might be ok for this particular case. Then I would suggest to add comments 
after the `break` for better readability.
   ```suggestion
                                        break; // pendingTaskManagerSlot loop
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java
##########
@@ -1540,6 +1555,85 @@ public void testSpreadOutSlotAllocationStrategy() throws 
Exception {
                }
        }
 
+       /**
+        * Test that the slot manager respect the max limitation of the number 
of slots when allocate new resource.
+        */
+       @Test
+       public void testMaxSlotLimitAllocateResource() throws Exception {
+               final int numberSlots = 1;
+               final int maxSlotNum = 1;
+
+               final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
+               final JobID jobId = new JobID();
+
+               final AtomicInteger resourceRequests = new AtomicInteger(0);
+               ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder()
+                       .setAllocateResourceFunction(
+                               ignored -> {
+                                       resourceRequests.incrementAndGet();
+                                       return true;
+                               })
+                       .build();
+
+               final Configuration configuration = new Configuration();
+               configuration.setInteger(ResourceManagerOptions.MAX_SLOT_NUM, 
maxSlotNum);
+               configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
numberSlots);
+
+               try (SlotManagerImpl slotManager = 
createSlotManager(resourceManagerId, resourceManagerActions, configuration)) {
+
+                       assertTrue("The slot request should be accepted", 
slotManager.registerSlotRequest(createSlotRequest(jobId)));
+                       assertThat(resourceRequests.get(), is(1));
+
+                       // The second slot request should not try to allocate a 
new resource because of the max limitation.
+                       assertTrue("The slot request should be accepted", 
slotManager.registerSlotRequest(createSlotRequest(jobId)));
+                       assertThat(resourceRequests.get(), is(1));
+               }
+       }
+
+       /**
+        * Test that the slot manager release resource when the number of slots 
exceed max limit when new TaskExecutor registered.
+        */
+       @Test
+       public void testMaxSlotLimitRegisterResource() throws Exception {
+               final int numberSlots = 1;
+               final int maxSlotNum = 1;
+               final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
+
+               final CompletableFuture<InstanceID> releasedResourceFuture = 
new CompletableFuture<>();
+               ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder()
+                       .setReleaseResourceConsumer((instanceID, e) -> 
releasedResourceFuture.complete(instanceID))
+                       .build();
+
+               final Configuration configuration = new Configuration();
+               configuration.setInteger(ResourceManagerOptions.MAX_SLOT_NUM, 
maxSlotNum);
+               configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
numberSlots);
+
+               final TaskExecutorGateway taskExecutorGateway1 = new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
+               final TaskExecutorGateway taskExecutorGateway2 = new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
+               final ResourceID resourceId1 = ResourceID.generate();
+               final ResourceID resourceId2 = ResourceID.generate();
+               final TaskExecutorConnection taskManagerConnection1 = new 
TaskExecutorConnection(resourceId1, taskExecutorGateway1);
+               final TaskExecutorConnection taskManagerConnection2 = new 
TaskExecutorConnection(resourceId2, taskExecutorGateway2);
+
+               final SlotID slotId1 = new SlotID(resourceId1, 0);
+               final SlotID slotId2 = new SlotID(resourceId1, 0);
+               final SlotStatus slotStatus1 = new SlotStatus(slotId1, 
ResourceProfile.UNKNOWN);
+               final SlotStatus slotStatus2 = new SlotStatus(slotId2, 
ResourceProfile.UNKNOWN);
+               final SlotReport slotReport1 = new 
SlotReport(Collections.singletonList(slotStatus1));
+               final SlotReport slotReport2 = new 
SlotReport(Collections.singletonList(slotStatus2));
+
+               try (SlotManagerImpl slotManager = 
createSlotManager(resourceManagerId, resourceManagerActions, configuration)) {
+                       slotManager.registerTaskManager(taskManagerConnection1, 
slotReport1);
+                       slotManager.registerTaskManager(taskManagerConnection2, 
slotReport2);
+
+                       assertThat("The number registered slots does not equal 
the expected number.", slotManager.getNumberRegisteredSlots(), is(equalTo(1)));

Review comment:
       ```suggestion
                        assertThat("The number registered slots does not equal 
the expected number.", slotManager.getNumberRegisteredSlots(), is(1));
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -808,16 +848,29 @@ private void 
fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequ
                return Optional.empty();
        }
 
-       private boolean isFulfillableByRegisteredSlots(ResourceProfile 
resourceProfile) {
+       private boolean isFulfillableByRegisteredOrPendingSlots(ResourceProfile 
resourceProfile) {
                for (TaskManagerSlot slot : slots.values()) {
                        if 
(slot.getResourceProfile().isMatching(resourceProfile)) {
                                return true;
                        }
                }
+
+               for (PendingTaskManagerSlot slot : pendingSlots.values()) {
+                       if 
(slot.getResourceProfile().isMatching(resourceProfile)) {
+                               return true;
+                       }
+               }
+
                return false;
        }
 
        private Optional<PendingTaskManagerSlot> 
allocateResource(ResourceProfile requestedSlotResourceProfile) {
+               if (getNumberPendingTaskManagerSlots() + 
getNumberRegisteredSlots() + numSlotsPerWorker > maxSlotNum) {

Review comment:
       Could have local variable for `getNumberPendingTaskManagerSlots() + 
getNumberRegisteredSlots()`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -658,6 +666,38 @@ private PendingTaskManagerSlot 
findExactlyMatchingPendingTaskManagerSlot(Resourc
                return null;
        }
 
+       private boolean totalSlotNumExceedMaxLimitAfterRegister(SlotReport 
slotReport) {
+               int numRegisteredSlot = 0;
+               for (SlotStatus ignored : slotReport) {
+                       numRegisteredSlot += 1;
+               }
+               // First check if the total number exceed before matching 
pending slot.
+               if (getNumberRegisteredSlots() + 
getNumberPendingTaskManagerSlots() + numRegisteredSlot > maxSlotNum) {
+                       // Then, check how many exceed slots could be consumed 
by pending slot.
+                       int numMatchingPendingTaskManagerSlots = 
getNumMatchingPendingTaskManagerSlots(slotReport);
+                       int totalSlotNum = getNumberRegisteredSlots() + 
getNumberPendingTaskManagerSlots() + numRegisteredSlot - 
numMatchingPendingTaskManagerSlots;

Review comment:
       I think it would be better to have `getNumNonPendingRegisteredSlots()` 
or so rather than `numRegisteredSlot - getNumMatchingPendingTaskManagerSlots()`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -808,16 +848,29 @@ private void 
fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequ
                return Optional.empty();
        }
 
-       private boolean isFulfillableByRegisteredSlots(ResourceProfile 
resourceProfile) {
+       private boolean isFulfillableByRegisteredOrPendingSlots(ResourceProfile 
resourceProfile) {
                for (TaskManagerSlot slot : slots.values()) {
                        if 
(slot.getResourceProfile().isMatching(resourceProfile)) {
                                return true;
                        }
                }
+
+               for (PendingTaskManagerSlot slot : pendingSlots.values()) {
+                       if 
(slot.getResourceProfile().isMatching(resourceProfile)) {
+                               return true;
+                       }
+               }
+
                return false;
        }
 
        private Optional<PendingTaskManagerSlot> 
allocateResource(ResourceProfile requestedSlotResourceProfile) {
+               if (getNumberPendingTaskManagerSlots() + 
getNumberRegisteredSlots() + numSlotsPerWorker > maxSlotNum) {
+                       LOG.warn("Could not allocate {} more slots. The number 
of slots is {}, while the maximum is {}.",

Review comment:
       ```suggestion
                        LOG.warn("Could not allocate {} more slots. The number 
of registered and pending slots is {}, while the maximum is {}.",
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to