xintongsong commented on a change in pull request #15047: URL: https://github.com/apache/flink/pull/15047#discussion_r584475304
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java ########## @@ -594,6 +608,30 @@ public int getNumberPendingSlotRequests() { // Internal periodic check methods // --------------------------------------------------------------------------------------------- + private void triggerCheckResourceRequirements() { + if (resourceRequirementsCheck == null) { + resourceRequirementsCheck = + scheduledExecutor.schedule( Review comment: Can we decide whether to schedule a new check from the previous `ScheduledFuture.isDone`? It would be nice to avoid having `null` valued fields if possible, at least during the running state (after initialized and before suspended/closed). ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java ########## @@ -594,6 +608,30 @@ public int getNumberPendingSlotRequests() { // Internal periodic check methods // --------------------------------------------------------------------------------------------- + private void triggerCheckResourceRequirements() { + if (resourceRequirementsCheck == null) { + resourceRequirementsCheck = + scheduledExecutor.schedule( + () -> + mainThreadExecutor.execute( + () -> { + checkResourceRequirements(); + resourceRequirementsCheck = null; + }), + requirementsCheckDelay.toMilliseconds(), + TimeUnit.MILLISECONDS); + } + } + + @VisibleForTesting + void manuallyTriggerCheckResourceRequirementsIfNeeded() { Review comment: If we pass in the delay as an argument, we might not need this. Instead, we can pass in a `0` delay. Additionally, we can leverage `ManuallyTriggeredScheduledExecutor` if needed. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java ########## @@ -64,6 +64,9 @@ public class FineGrainedSlotManager implements SlotManager { private static final Logger LOG = LoggerFactory.getLogger(FineGrainedSlotManager.class); + // We currently make the delay of requirements check a constant time. This delay might be + // configurable by user in the future. + private static final long REQUIREMENTS_CHECK_DELAY_MS = 50L; Review comment: I'm ok with not exposing this to user as the first step. However, I would suggest to move the constant to `ResourceManagerRuntimeServices`, so that we can pass different values for tests. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org