tillrohrmann commented on a change in pull request #15812: URL: https://github.com/apache/flink/pull/15812#discussion_r666971878
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java ########## @@ -21,73 +21,84 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; -import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.util.ResourceCounter; import org.apache.flink.util.clock.Clock; import org.apache.flink.util.clock.SystemClock; import javax.annotation.Nullable; import java.util.concurrent.CompletableFuture; -/** Builder for a {@link TestingSlotPoolImpl}. */ -public class SlotPoolBuilder { +/** Builder for a {@link DeclarativeSlotPool}. */ +public class DeclarativeSlotPoolBridgeBuilder { private final ComponentMainThreadExecutor componentMainThreadExecutor; private JobID jobId = new JobID(); private Time batchSlotTimeout = Time.milliseconds(JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue()); - private Time idleSlotTimeout = TestingUtils.infiniteTime(); + private Time idleSlotTimeout = Time.days(1); private Clock clock = SystemClock.getInstance(); @Nullable private ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); - public SlotPoolBuilder(ComponentMainThreadExecutor componentMainThreadExecutor) { + private boolean autoLoadRequirement = true; + private int resourceRequirementCount = 100; + + public DeclarativeSlotPoolBridgeBuilder( + ComponentMainThreadExecutor componentMainThreadExecutor) { this.componentMainThreadExecutor = componentMainThreadExecutor; } - public SlotPoolBuilder setResourceManagerGateway( + public DeclarativeSlotPoolBridgeBuilder setResourceManagerGateway( @Nullable ResourceManagerGateway resourceManagerGateway) { this.resourceManagerGateway = resourceManagerGateway; return this; } - public SlotPoolBuilder setBatchSlotTimeout(Time batchSlotTimeout) { + public DeclarativeSlotPoolBridgeBuilder setBatchSlotTimeout(Time batchSlotTimeout) { this.batchSlotTimeout = batchSlotTimeout; return this; } - public SlotPoolBuilder setIdleSlotTimeout(Time idleSlotTimeout) { + public DeclarativeSlotPoolBridgeBuilder setIdleSlotTimeout(Time idleSlotTimeout) { this.idleSlotTimeout = idleSlotTimeout; return this; } - public SlotPoolBuilder setClock(Clock clock) { + public DeclarativeSlotPoolBridgeBuilder setClock(Clock clock) { this.clock = clock; return this; } - public SlotPoolBuilder setJobId(JobID jobId) { + public DeclarativeSlotPoolBridgeBuilder setJobId(JobID jobId) { this.jobId = jobId; return this; } - public TestingSlotPoolImpl build() throws Exception { - final TestingSlotPoolImpl slotPool = - new TestingSlotPoolImpl( + public DeclarativeSlotPoolBridge build() throws Exception { + final DeclarativeSlotPoolBridge slotPool = + new DeclarativeSlotPoolBridge( jobId, + new DefaultDeclarativeSlotPoolFactory(), clock, - TestingUtils.infiniteTime(), + Time.days(1), idleSlotTimeout, batchSlotTimeout); slotPool.start(JobMasterId.generate(), "foobar", componentMainThreadExecutor); - + if (this.autoLoadRequirement) { + slotPool.getDeclarativeSlotPool() + .setResourceRequirements( + ResourceCounter.withResource( + ResourceProfile.ANY, this.resourceRequirementCount)); + } Review comment: Is this strictly required? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java ########## @@ -21,73 +21,84 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; -import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.util.ResourceCounter; import org.apache.flink.util.clock.Clock; import org.apache.flink.util.clock.SystemClock; import javax.annotation.Nullable; import java.util.concurrent.CompletableFuture; -/** Builder for a {@link TestingSlotPoolImpl}. */ -public class SlotPoolBuilder { +/** Builder for a {@link DeclarativeSlotPool}. */ Review comment: ```suggestion /** Builder for a {@link DeclarativeSlotPoolBridge}. */ ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java ########## @@ -97,4 +108,15 @@ public TestingSlotPoolImpl build() throws Exception { return slotPool; } + + public DeclarativeSlotPoolBridgeBuilder setAutoLoadRequirementCount( + int resourceRequirementCount) { + this.resourceRequirementCount = resourceRequirementCount; + return this; + } + + public DeclarativeSlotPoolBridgeBuilder disableAutoLoadRequirements() { + this.autoLoadRequirement = false; + return this; + } Review comment: Why is this needed? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java ########## @@ -50,8 +48,8 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; -/** Tests how the {@link SlotPoolImpl} completes slot requests. */ -public class SlotPoolRequestCompletionTest extends TestLogger { +/** Tests how the {@link SlotPool} completes slot requests. */ Review comment: ```suggestion /** Tests how the {@link DeclarativeSlotPoolBridge} completes slot requests. */ ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ########## @@ -97,8 +93,10 @@ private void executeOperationForAllExecutions( } } - private SlotPoolImpl createSlotPoolImpl() { - return new TestingSlotPoolImpl(TEST_JOB_ID); + private SlotPool createSlotPoolImpl() throws Exception { Review comment: ```suggestion private SlotPool createSlotPool() throws Exception { ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java ########## @@ -109,8 +102,7 @@ private void runSlotRequestCompletionTest( slotPool.registerTaskManager(taskManagerLocation.getResourceID()); // create a slot offer that is initiated by the last request - final AllocationID lastAllocationId = - rmReceivedSlotRequests.get(requestNum - 1).getAllocationId(); + final AllocationID lastAllocationId = new AllocationID(); Review comment: `lastAllocationId` should no longer matter. I think this can be inlined. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org