1996fanrui commented on code in PR #25113:
URL: https://github.com/apache/flink/pull/25113#discussion_r1689012554


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java:
##########
@@ -34,14 +36,18 @@ public abstract class AbstractSlotPoolServiceFactory 
implements SlotPoolServiceF
 
     @Nonnull protected final Time batchSlotTimeout;
 
+    @Nonnull protected final Duration slotRequestMaxInterval;
+
     protected AbstractSlotPoolServiceFactory(
             @Nonnull Clock clock,
             @Nonnull Time rpcTimeout,
             @Nonnull Time slotIdleTimeout,
-            @Nonnull Time batchSlotTimeout) {
+            @Nonnull Time batchSlotTimeout,

Review Comment:
   The `Time` has been deprecated. Would you mind refactoring Time to Duration 
with a hotfix commit before your commit with in current PR?
   
   Note: Only refactor all classes using Time related to current PR is enough.
   
   It's useful for avoid code conflict in the future, and it's helpful for 
removing deprecated api.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -1115,6 +1116,11 @@ public CompletableFuture<Acknowledge> 
updateJobResourceRequirements(
         return CompletableFuture.completedFuture(Acknowledge.get());
     }
 
+    @VisibleForTesting
+    public SlotPoolService getSlotPoolService() {

Review Comment:
   As I understand, exposing this method is for 
`testSlotRequestMaxIntervalAndComponentMainThreadExecutorPassing`, right?
   
   I don't think test passing parameters is needed. If we test all passing 
parameters, the tests will be very bloated.  Or why other parameters are not 
tested?
   
   In contrast, the Flink community prefers black-box testing, that is, you 
only need to test that your functions meet expectations, without paying 
attention to implementation details.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java:
##########
@@ -81,18 +84,26 @@ public DeclarativeSlotPoolService(
             DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
             Clock clock,
             Time idleSlotTimeout,
-            Time rpcTimeout) {
+            Time rpcTimeout,
+            Duration slotRequestMaxInterval,
+            @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) {
         this.jobId = jobId;
         this.clock = clock;
         this.rpcTimeout = rpcTimeout;
         this.registeredTaskManagers = new HashSet<>();
 
         this.declarativeSlotPool =
                 declarativeSlotPoolFactory.create(
-                        jobId, this::declareResourceRequirements, 
idleSlotTimeout, rpcTimeout);
+                        jobId,
+                        this::declareResourceRequirements,
+                        idleSlotTimeout,
+                        rpcTimeout,
+                        slotRequestMaxInterval,
+                        componentMainThreadExecutor);
     }
 
-    protected DeclarativeSlotPool getDeclarativeSlotPool() {
+    @VisibleForTesting
+    public DeclarativeSlotPool getDeclarativeSlotPool() {

Review Comment:
   This change can be reverted if we don't need 
`JobMasterTest#testSlotRequestMaxIntervalAndComponentMainThreadExecutorPassing`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java:
##########
@@ -360,7 +360,9 @@ private DeclarativeSlotPoolService 
createDeclarativeSlotPoolService(
                         declarativeSlotPoolFactory,
                         SystemClock.getInstance(),
                         Time.seconds(20L),
-                        Time.seconds(20L));
+                        Time.seconds(20L),
+                        Duration.ZERO,
+                        forMainThread());
 
         declarativeSlotPoolService.start(jobMasterId, address, 
mainThreadExecutor);

Review Comment:
   ```suggestion
                           mainThreadExecutor);
   
           declarativeSlotPoolService.start(jobMasterId, address, 
mainThreadExecutor);
   ```
   
   This test class already has the `mainThreadExecutor`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##########
@@ -103,12 +108,20 @@ public class DefaultDeclarativeSlotPool implements 
DeclarativeSlotPool {
 
     private final RequirementMatcher requirementMatcher = new 
DefaultRequirementMatcher();
 
+    @Nonnull private final ComponentMainThreadExecutor 
componentMainThreadExecutor;
+
+    // For slots(resources) requests by batch.
+    @Nonnull private final Duration slotRequestMaxInterval;
+    @Nullable private ScheduledFuture<?> slotRequestMaxIntervalTimeoutFuture;

Review Comment:
   ```suggestion
       @Nullable private ScheduledFuture<?> slotRequestFuture;
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##########
@@ -599,4 +632,27 @@ private ResourceCounter getFulfilledRequirements(
     ResourceCounter getFulfilledResourceRequirements() {
         return fulfilledResourceRequirements;
     }
+
+    @VisibleForTesting
+    @Nonnull
+    public ComponentMainThreadExecutor getComponentMainThreadExecutor() {
+        return componentMainThreadExecutor;
+    }
+
+    @VisibleForTesting
+    @Nonnull
+    public Duration getSlotRequestMaxInterval() {
+        return slotRequestMaxInterval;
+    }
+
+    @VisibleForTesting
+    public void tryWaitSlotRequestMaxIntervalTimeout() {

Review Comment:
   ```suggestion
       void tryWaitSlotRequestIsDone() {
   ```
   
   1. Don't need public.
   2. We are waiting for SlotRequestIsDone instead of fixed timeout.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java:
##########
@@ -54,12 +55,11 @@ void 
testDeclarativeSlotPoolTakesPreferredAllocationsIntoAccount() throws Except
                         TestingUtils.infiniteTime(),
                         TestingUtils.infiniteTime(),
                         TestingUtils.infiniteTime(),
-                        
PreferredAllocationRequestSlotMatchingStrategy.INSTANCE);
+                        
PreferredAllocationRequestSlotMatchingStrategy.INSTANCE,
+                        Duration.ZERO,
+                        forMainThread());
 
-        declarativeSlotPoolBridge.start(
-                JobMasterId.generate(),
-                "localhost",
-                ComponentMainThreadExecutorServiceAdapter.forMainThread());
+        declarativeSlotPoolBridge.start(JobMasterId.generate(), "localhost", 
forMainThread());

Review Comment:
   New 2 threadExecutor isn't needed.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java:
##########
@@ -280,7 +279,9 @@ static DeclarativeSlotPoolBridge 
createDeclarativeSlotPoolBridge(
                 rpcTimeout,
                 Time.seconds(20),
                 Time.seconds(20),
-                requestSlotMatchingStrategy);
+                requestSlotMatchingStrategy,
+                Duration.ZERO,
+                forMainThread());

Review Comment:
   ```suggestion
                   mainThreadExecutor);
   ```
   
   Same comment, we don't need to new a separate threadExecutor each time, 
reuse `mainThreadExecutor` is enough.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java:
##########
@@ -81,18 +84,26 @@ public DeclarativeSlotPoolService(
             DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
             Clock clock,
             Time idleSlotTimeout,
-            Time rpcTimeout) {
+            Time rpcTimeout,
+            Duration slotRequestMaxInterval,
+            @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) {

Review Comment:
   We already added `mainThreadExecutor`, so I don't think the 
`SlotPoolService#start` need `ComponentMainThreadExecutor mainThreadExecutor` 
parameter for now.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java:
##########
@@ -299,14 +310,22 @@ public BlocklistDeclarativeSlotPoolBuilder 
setBlockedTaskManagerChecker(
             return this;
         }
 
-        public BlocklistDeclarativeSlotPool build() {
+        public BlocklistDeclarativeSlotPool build(
+                Duration slotRequestMaxInterval,
+                ComponentMainThreadExecutor componentMainThreadExecutor) {
             return new BlocklistDeclarativeSlotPool(
                     new JobID(),
                     new DefaultAllocatedSlotPool(),
                     ignored -> {},
                     blockedTaskManagerChecker,
                     Time.seconds(20),
-                    Time.seconds(20));
+                    Time.seconds(20),
+                    slotRequestMaxInterval,
+                    componentMainThreadExecutor);
+        }
+
+        public BlocklistDeclarativeSlotPool build() {
+            return build(Duration.ZERO, forMainThread());

Review Comment:
   I'm curious why do we hard code here. 
   
   `DefaultDeclarativeSlotPoolTestBase` defined `slotRequestMaxInterval` 
parameter, why don't use parameter for each test?
   
   
   IIUC, hard code it to ZERO means parameter doesn't take effect when build is 
called.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTestBase.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import org.assertj.core.util.Lists;
+
+import java.time.Duration;
+import java.util.List;
+
+/**
+ * Tests base class for the {@link DefaultDeclarativeSlotPool} & {@link
+ * BlocklistDeclarativeSlotPool}.
+ */
+abstract class DefaultDeclarativeSlotPoolTestBase {
+
+    // The bool attribute is used only for readable for the parameterized test 
cases.
+    @Parameter boolean enableSlotRequestByBatch;

Review Comment:
   I don't think we need the `enableSlotRequestByBatch`.
   
   IIUC, the `slotRequestMaxInterval` is 0 means disable batch request. So you 
define 2 parameters to determine one behavior. If `enableSlotRequestByBatch` is 
true and  `slotRequestMaxInterval` is 0. How do you handle it?
   
   It's easy to let the code is inconsistent.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTestBase.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import org.assertj.core.util.Lists;
+
+import java.time.Duration;
+import java.util.List;
+
+/**
+ * Tests base class for the {@link DefaultDeclarativeSlotPool} & {@link
+ * BlocklistDeclarativeSlotPool}.
+ */
+abstract class DefaultDeclarativeSlotPoolTestBase {
+
+    // The bool attribute is used only for readable for the parameterized test 
cases.
+    @Parameter boolean enableSlotRequestByBatch;
+
+    @Parameter(1)
+    Duration slotRequestMaxInterval;
+
+    @Parameter(2)
+    ComponentMainThreadExecutor componentMainThreadExecutor;

Review Comment:
   `getParametersCouples` has 2 types parameter, but 
`componentMainThreadExecutor` always same.
   
   I'm curious why `componentMainThreadExecutor` is a parameter? Why don't we 
initialize it directly? Such as: `final ComponentMainThreadExecutor 
componentMainThreadExecutor = 
ComponentMainThreadExecutorServiceAdapter.forMainThread()`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTestBase.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import org.assertj.core.util.Lists;
+
+import java.time.Duration;
+import java.util.List;
+
+/**
+ * Tests base class for the {@link DefaultDeclarativeSlotPool} & {@link
+ * BlocklistDeclarativeSlotPool}.

Review Comment:
   It's better to let `abstract class` and `interface` don't care about the 
implementation class.
   
   In other words, could you ensure the comment be updated if introducing new 
implementation class in the future?
   
   IIUC, `DefaultDeclarativeSlotPoolTestBase` as the TestBase for 
`DefaultDeclarativeSlotPool` and its subclass.  So we don't need mention 
`BlocklistDeclarativeSlotPool`. 
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##########
@@ -599,4 +632,27 @@ private ResourceCounter getFulfilledRequirements(
     ResourceCounter getFulfilledResourceRequirements() {
         return fulfilledResourceRequirements;
     }
+
+    @VisibleForTesting
+    @Nonnull
+    public ComponentMainThreadExecutor getComponentMainThreadExecutor() {
+        return componentMainThreadExecutor;
+    }
+
+    @VisibleForTesting
+    @Nonnull
+    public Duration getSlotRequestMaxInterval() {
+        return slotRequestMaxInterval;
+    }

Review Comment:
   These 2 methods can be removed as well



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java:
##########
@@ -548,4 +558,13 @@ void increaseResourceRequirementsBy(ResourceCounter 
increment) {
     boolean isBatchSlotRequestTimeoutCheckEnabled() {
         return !isBatchSlotRequestTimeoutCheckDisabled;
     }
+
+    @VisibleForTesting
+    public void tryWaitSlotRequestMaxIntervalTimeout() {

Review Comment:
   This method is not called, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to