1996fanrui commented on code in PR #25113: URL: https://github.com/apache/flink/pull/25113#discussion_r1689012554
########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java: ########## @@ -34,14 +36,18 @@ public abstract class AbstractSlotPoolServiceFactory implements SlotPoolServiceF @Nonnull protected final Time batchSlotTimeout; + @Nonnull protected final Duration slotRequestMaxInterval; + protected AbstractSlotPoolServiceFactory( @Nonnull Clock clock, @Nonnull Time rpcTimeout, @Nonnull Time slotIdleTimeout, - @Nonnull Time batchSlotTimeout) { + @Nonnull Time batchSlotTimeout, Review Comment: The `Time` has been deprecated. Would you mind refactoring Time to Duration with a hotfix commit before your commit with in current PR? Note: Only refactor all classes using Time related to current PR is enough. It's useful for avoid code conflict in the future, and it's helpful for removing deprecated api. ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ########## @@ -1115,6 +1116,11 @@ public CompletableFuture<Acknowledge> updateJobResourceRequirements( return CompletableFuture.completedFuture(Acknowledge.get()); } + @VisibleForTesting + public SlotPoolService getSlotPoolService() { Review Comment: As I understand, exposing this method is for `testSlotRequestMaxIntervalAndComponentMainThreadExecutorPassing`, right? I don't think test passing parameters is needed. If we test all passing parameters, the tests will be very bloated. Or why other parameters are not tested? In contrast, the Flink community prefers black-box testing, that is, you only need to test that your functions meet expectations, without paying attention to implementation details. ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java: ########## @@ -81,18 +84,26 @@ public DeclarativeSlotPoolService( DeclarativeSlotPoolFactory declarativeSlotPoolFactory, Clock clock, Time idleSlotTimeout, - Time rpcTimeout) { + Time rpcTimeout, + Duration slotRequestMaxInterval, + @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) { this.jobId = jobId; this.clock = clock; this.rpcTimeout = rpcTimeout; this.registeredTaskManagers = new HashSet<>(); this.declarativeSlotPool = declarativeSlotPoolFactory.create( - jobId, this::declareResourceRequirements, idleSlotTimeout, rpcTimeout); + jobId, + this::declareResourceRequirements, + idleSlotTimeout, + rpcTimeout, + slotRequestMaxInterval, + componentMainThreadExecutor); } - protected DeclarativeSlotPool getDeclarativeSlotPool() { + @VisibleForTesting + public DeclarativeSlotPool getDeclarativeSlotPool() { Review Comment: This change can be reverted if we don't need `JobMasterTest#testSlotRequestMaxIntervalAndComponentMainThreadExecutorPassing`. ########## flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java: ########## @@ -360,7 +360,9 @@ private DeclarativeSlotPoolService createDeclarativeSlotPoolService( declarativeSlotPoolFactory, SystemClock.getInstance(), Time.seconds(20L), - Time.seconds(20L)); + Time.seconds(20L), + Duration.ZERO, + forMainThread()); declarativeSlotPoolService.start(jobMasterId, address, mainThreadExecutor); Review Comment: ```suggestion mainThreadExecutor); declarativeSlotPoolService.start(jobMasterId, address, mainThreadExecutor); ``` This test class already has the `mainThreadExecutor` ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ########## @@ -103,12 +108,20 @@ public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { private final RequirementMatcher requirementMatcher = new DefaultRequirementMatcher(); + @Nonnull private final ComponentMainThreadExecutor componentMainThreadExecutor; + + // For slots(resources) requests by batch. + @Nonnull private final Duration slotRequestMaxInterval; + @Nullable private ScheduledFuture<?> slotRequestMaxIntervalTimeoutFuture; Review Comment: ```suggestion @Nullable private ScheduledFuture<?> slotRequestFuture; ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ########## @@ -599,4 +632,27 @@ private ResourceCounter getFulfilledRequirements( ResourceCounter getFulfilledResourceRequirements() { return fulfilledResourceRequirements; } + + @VisibleForTesting + @Nonnull + public ComponentMainThreadExecutor getComponentMainThreadExecutor() { + return componentMainThreadExecutor; + } + + @VisibleForTesting + @Nonnull + public Duration getSlotRequestMaxInterval() { + return slotRequestMaxInterval; + } + + @VisibleForTesting + public void tryWaitSlotRequestMaxIntervalTimeout() { Review Comment: ```suggestion void tryWaitSlotRequestIsDone() { ``` 1. Don't need public. 2. We are waiting for SlotRequestIsDone instead of fixed timeout. ########## flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java: ########## @@ -54,12 +55,11 @@ void testDeclarativeSlotPoolTakesPreferredAllocationsIntoAccount() throws Except TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), - PreferredAllocationRequestSlotMatchingStrategy.INSTANCE); + PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, + Duration.ZERO, + forMainThread()); - declarativeSlotPoolBridge.start( - JobMasterId.generate(), - "localhost", - ComponentMainThreadExecutorServiceAdapter.forMainThread()); + declarativeSlotPoolBridge.start(JobMasterId.generate(), "localhost", forMainThread()); Review Comment: New 2 threadExecutor isn't needed. ########## flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java: ########## @@ -280,7 +279,9 @@ static DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge( rpcTimeout, Time.seconds(20), Time.seconds(20), - requestSlotMatchingStrategy); + requestSlotMatchingStrategy, + Duration.ZERO, + forMainThread()); Review Comment: ```suggestion mainThreadExecutor); ``` Same comment, we don't need to new a separate threadExecutor each time, reuse `mainThreadExecutor` is enough. ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java: ########## @@ -81,18 +84,26 @@ public DeclarativeSlotPoolService( DeclarativeSlotPoolFactory declarativeSlotPoolFactory, Clock clock, Time idleSlotTimeout, - Time rpcTimeout) { + Time rpcTimeout, + Duration slotRequestMaxInterval, + @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) { Review Comment: We already added `mainThreadExecutor`, so I don't think the `SlotPoolService#start` need `ComponentMainThreadExecutor mainThreadExecutor` parameter for now. ########## flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java: ########## @@ -299,14 +310,22 @@ public BlocklistDeclarativeSlotPoolBuilder setBlockedTaskManagerChecker( return this; } - public BlocklistDeclarativeSlotPool build() { + public BlocklistDeclarativeSlotPool build( + Duration slotRequestMaxInterval, + ComponentMainThreadExecutor componentMainThreadExecutor) { return new BlocklistDeclarativeSlotPool( new JobID(), new DefaultAllocatedSlotPool(), ignored -> {}, blockedTaskManagerChecker, Time.seconds(20), - Time.seconds(20)); + Time.seconds(20), + slotRequestMaxInterval, + componentMainThreadExecutor); + } + + public BlocklistDeclarativeSlotPool build() { + return build(Duration.ZERO, forMainThread()); Review Comment: I'm curious why do we hard code here. `DefaultDeclarativeSlotPoolTestBase` defined `slotRequestMaxInterval` parameter, why don't use parameter for each test? IIUC, hard code it to ZERO means parameter doesn't take effect when build is called. ########## flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTestBase.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; + +import org.assertj.core.util.Lists; + +import java.time.Duration; +import java.util.List; + +/** + * Tests base class for the {@link DefaultDeclarativeSlotPool} & {@link + * BlocklistDeclarativeSlotPool}. + */ +abstract class DefaultDeclarativeSlotPoolTestBase { + + // The bool attribute is used only for readable for the parameterized test cases. + @Parameter boolean enableSlotRequestByBatch; Review Comment: I don't think we need the `enableSlotRequestByBatch`. IIUC, the `slotRequestMaxInterval` is 0 means disable batch request. So you define 2 parameters to determine one behavior. If `enableSlotRequestByBatch` is true and `slotRequestMaxInterval` is 0. How do you handle it? It's easy to let the code is inconsistent. ########## flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTestBase.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; + +import org.assertj.core.util.Lists; + +import java.time.Duration; +import java.util.List; + +/** + * Tests base class for the {@link DefaultDeclarativeSlotPool} & {@link + * BlocklistDeclarativeSlotPool}. + */ +abstract class DefaultDeclarativeSlotPoolTestBase { + + // The bool attribute is used only for readable for the parameterized test cases. + @Parameter boolean enableSlotRequestByBatch; + + @Parameter(1) + Duration slotRequestMaxInterval; + + @Parameter(2) + ComponentMainThreadExecutor componentMainThreadExecutor; Review Comment: `getParametersCouples` has 2 types parameter, but `componentMainThreadExecutor` always same. I'm curious why `componentMainThreadExecutor` is a parameter? Why don't we initialize it directly? Such as: `final ComponentMainThreadExecutor componentMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread()`. ########## flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTestBase.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; + +import org.assertj.core.util.Lists; + +import java.time.Duration; +import java.util.List; + +/** + * Tests base class for the {@link DefaultDeclarativeSlotPool} & {@link + * BlocklistDeclarativeSlotPool}. Review Comment: It's better to let `abstract class` and `interface` don't care about the implementation class. In other words, could you ensure the comment be updated if introducing new implementation class in the future? IIUC, `DefaultDeclarativeSlotPoolTestBase` as the TestBase for `DefaultDeclarativeSlotPool` and its subclass. So we don't need mention `BlocklistDeclarativeSlotPool`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ########## @@ -599,4 +632,27 @@ private ResourceCounter getFulfilledRequirements( ResourceCounter getFulfilledResourceRequirements() { return fulfilledResourceRequirements; } + + @VisibleForTesting + @Nonnull + public ComponentMainThreadExecutor getComponentMainThreadExecutor() { + return componentMainThreadExecutor; + } + + @VisibleForTesting + @Nonnull + public Duration getSlotRequestMaxInterval() { + return slotRequestMaxInterval; + } Review Comment: These 2 methods can be removed as well ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java: ########## @@ -548,4 +558,13 @@ void increaseResourceRequirementsBy(ResourceCounter increment) { boolean isBatchSlotRequestTimeoutCheckEnabled() { return !isBatchSlotRequestTimeoutCheckDisabled; } + + @VisibleForTesting + public void tryWaitSlotRequestMaxIntervalTimeout() { Review Comment: This method is not called, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org