Thesharing commented on a change in pull request #12917:
URL: https://github.com/apache/flink/pull/12917#discussion_r462017275



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBuilder.java
##########
@@ -35,9 +36,10 @@
  */
 public class SlotPoolBuilder {
 
-       private ComponentMainThreadExecutor componentMainThreadExecutor;
-       private ResourceManagerGateway resourceManagerGateway = new 
TestingResourceManagerGateway();

Review comment:
       I've already added the default value back in the fix-up commit.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -145,14 +133,13 @@ public void testAllocateSimpleSlot() throws Exception {
 
                        assertTrue(slotPool.offerSlot(taskManagerLocation, 
taskManagerGateway, slotOffer));
 
-                       LogicalSlot slot = future.get(1, TimeUnit.SECONDS);
+                       final PhysicalSlot physicalSlot = future.get(1, 
TimeUnit.SECONDS);
                        assertTrue(future.isDone());
-                       assertTrue(slot.isAlive());
-                       assertEquals(taskManagerLocation, 
slot.getTaskManagerLocation());
+                       assertEquals(taskManagerLocation, 
physicalSlot.getTaskManagerLocation());
+                       assertEquals(slotRequest.getAllocationId(), 
physicalSlot.getAllocationId());
                }
        }
 
-       @Nonnull
        private SlotPoolImpl createSlotPoolImpl() {

Review comment:
       Okay, when I rearrange the commits I'll take care of this.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBuilder.java
##########
@@ -54,23 +56,47 @@ public SlotPoolBuilder setBatchSlotTimeout(Time 
batchSlotTimeout) {
                return this;
        }
 
+       public SlotPoolBuilder setIdleSlotTimeout(Time idleSlotTimeout) {
+               this.idleSlotTimeout = idleSlotTimeout;
+               return this;
+       }
+
        public SlotPoolBuilder setClock(Clock clock) {
                this.clock = clock;
                return this;
        }
 
-       public TestingSlotPoolImpl build() throws Exception {
+       public TestingSlotPoolImpl build(JobID jobID, Boolean 
connectToResourceManager) throws Exception {
                final TestingSlotPoolImpl slotPool = new TestingSlotPoolImpl(
-                       new JobID(),
+                       jobID,
                        clock,
                        TestingUtils.infiniteTime(),
-                       TestingUtils.infiniteTime(),
+                       idleSlotTimeout,
                        batchSlotTimeout);
 
                slotPool.start(JobMasterId.generate(), "foobar", 
componentMainThreadExecutor);
 
-               CompletableFuture.runAsync(() -> 
slotPool.connectToResourceManager(resourceManagerGateway), 
componentMainThreadExecutor).join();
+               if (connectToResourceManager) {

Review comment:
       Done.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -510,33 +496,24 @@ public void testCheckIdleSlot() throws Exception {
 
                        slotPool.triggerCheckIdleSlot();
 
-                       final AllocationID freedSlot = 
freedSlots.poll(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+                       final AllocationID freedSlot = 
freedSlots.poll(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
 
                        assertThat(freedSlot, Matchers.is(expiredSlotID));
                        assertThat(freedSlots.isEmpty(), Matchers.is(true));
                }
        }
 
-       private TestingSlotPoolImpl createSlotPoolImpl(ManualClock clock) {
-               return new TestingSlotPoolImpl(
-                       jobId,
-                       clock,
-                       TestingUtils.infiniteTime(),
-                       timeout,
-                       TestingUtils.infiniteTime());
-       }
-
        /**
         * Tests that idle slots which cannot be released will be discarded. 
See FLINK-11059.
         */
        @Test
        public void testDiscardIdleSlotIfReleasingFailed() throws Exception {
                final ManualClock clock = new ManualClock();
 
-               try (TestingSlotPoolImpl slotPool = createSlotPoolImpl(clock)) {
-
-                       setupSlotPool(slotPool, resourceManagerGateway, 
mainThreadExecutor);
-
+               try (TestingSlotPoolImpl slotPool = slotPoolBuilder

Review comment:
       Done.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -665,19 +643,21 @@ public void testCreateAllocatedSlotReport() throws 
Exception {
                        slotRequestFuture.get();
 
                        final AllocatedSlotReport slotReport = 
slotPool.createAllocatedSlotReport(taskManagerLocation.getResourceID());
-                       assertThat(jobId, is(slotReport.getJobId()));
+                       assertThat(jobID, is(slotReport.getJobId()));
                        assertThat(slotReport.getAllocatedSlotInfos(), 
containsInAnyOrder(isEachEqual(allocatedSlotInfos)));
                }
        }
 
        @Test
        public void testCalculationOfTaskExecutorUtilization() throws Exception 
{
                try (final SlotPoolImpl slotPool = createAndSetUpSlotPool()) {
+                       
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
+
                        final TaskManagerLocation firstTaskManagerLocation = 
new LocalTaskManagerLocation();
                        final TaskManagerLocation secondTaskManagerLocation = 
new LocalTaskManagerLocation();
 
-                       final List<AllocationID> firstTaskManagersSlots = 
registerAndOfferSlots(firstTaskManagerLocation, slotPool, 4);
-                       final List<AllocationID> secondTaskManagersSlots = 
registerAndOfferSlots(secondTaskManagerLocation, slotPool, 4);
+                       final List<AllocationID> firstTaskManagersSlots = 
offerSlots(firstTaskManagerLocation, slotPool, 4);

Review comment:
       I've undone this modification.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -88,28 +84,21 @@
 /**
  * Tests for the {@link SlotPoolImpl}.
  */
-public class SlotPoolImplTest extends TestLogger {
+public class SlotPoolImplTest extends SlotPoolTestBase {
 
-       private final Time timeout = Time.seconds(10L);
-
-       private JobID jobId;
+       private static final Time TIMEOUT = Time.seconds(10L);
+       private static final ComponentMainThreadExecutor mainThreadExecutor =

Review comment:
       Removed.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestBase.java
##########
@@ -0,0 +1,55 @@
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Test base for {@link SlotPool} related test cases.
+ */
+public abstract class SlotPoolTestBase extends TestLogger {
+       protected static final Time TIMEOUT = Time.seconds(10L);
+
+       protected final ComponentMainThreadExecutor mainThreadExecutor =
+               ComponentMainThreadExecutorServiceAdapter.forMainThread();
+
+       protected TestingResourceManagerGateway resourceManagerGateway;
+       protected SlotPoolBuilder slotPoolBuilder;
+
+       @Before
+       public void setup() throws Exception {
+               resourceManagerGateway = new TestingResourceManagerGateway();
+               slotPoolBuilder = new 
SlotPoolBuilder(mainThreadExecutor).setResourceManagerGateway(resourceManagerGateway);
+       }
+
+       protected TestingSlotPoolImpl createAndSetUpSlotPool() throws Exception 
{
+               return slotPoolBuilder.build();
+       }
+
+       protected void requestNewAllocatedSlots(final SlotPool slotPool, final 
SlotRequestId... slotRequestIds) {
+               for (SlotRequestId slotRequestId : slotRequestIds) {
+                       requestNewAllocatedSlot(slotPool, slotRequestId);
+               }
+       }
+
+       protected CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(

Review comment:
       Done.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestBase.java
##########
@@ -0,0 +1,55 @@
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Test base for {@link SlotPool} related test cases.
+ */
+public abstract class SlotPoolTestBase extends TestLogger {
+       protected static final Time TIMEOUT = Time.seconds(10L);
+
+       protected final ComponentMainThreadExecutor mainThreadExecutor =
+               ComponentMainThreadExecutorServiceAdapter.forMainThread();
+
+       protected TestingResourceManagerGateway resourceManagerGateway;
+       protected SlotPoolBuilder slotPoolBuilder;
+
+       @Before
+       public void setup() throws Exception {
+               resourceManagerGateway = new TestingResourceManagerGateway();
+               slotPoolBuilder = new 
SlotPoolBuilder(mainThreadExecutor).setResourceManagerGateway(resourceManagerGateway);
+       }
+
+       protected TestingSlotPoolImpl createAndSetUpSlotPool() throws Exception 
{
+               return slotPoolBuilder.build();
+       }
+
+       protected void requestNewAllocatedSlots(final SlotPool slotPool, final 
SlotRequestId... slotRequestIds) {
+               for (SlotRequestId slotRequestId : slotRequestIds) {
+                       requestNewAllocatedSlot(slotPool, slotRequestId);
+               }
+       }
+
+       protected CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
+               final SlotPool slotPool,
+               final SlotRequestId slotRequestId) {

Review comment:
       Done.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -635,8 +612,9 @@ public void testFreeFailedSlots() throws Exception {
         */
        @Test
        public void testCreateAllocatedSlotReport() throws Exception {
+               final JobID jobID = new JobID();

Review comment:
       Done. I'll pay attention to this in the future.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBuilder.java
##########
@@ -54,23 +56,47 @@ public SlotPoolBuilder setBatchSlotTimeout(Time 
batchSlotTimeout) {
                return this;
        }
 
+       public SlotPoolBuilder setIdleSlotTimeout(Time idleSlotTimeout) {
+               this.idleSlotTimeout = idleSlotTimeout;
+               return this;
+       }
+
        public SlotPoolBuilder setClock(Clock clock) {
                this.clock = clock;
                return this;
        }
 
-       public TestingSlotPoolImpl build() throws Exception {
+       public TestingSlotPoolImpl build(JobID jobID, Boolean 
connectToResourceManager) throws Exception {

Review comment:
       The param `connectToResourceManager` has been removed in the fix-up 
commit.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBuilder.java
##########
@@ -35,9 +36,10 @@
  */
 public class SlotPoolBuilder {
 
-       private ComponentMainThreadExecutor componentMainThreadExecutor;
-       private ResourceManagerGateway resourceManagerGateway = new 
TestingResourceManagerGateway();
-       private Time batchSlotTimeout = Time.milliseconds(2L);
+       private final ComponentMainThreadExecutor componentMainThreadExecutor;
+       private ResourceManagerGateway resourceManagerGateway;
+       private Time batchSlotTimeout = 
Time.milliseconds(JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue());

Review comment:
       Because in most slot pool related test cases the batchSlotTimeout is set 
to be this value, and only in some test cases, the batchSlotTimeout is set to 
be 2 milliseconds. I set the default value to be this value, and explicitly set 
other timeout values in their own cases. 

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -635,8 +612,9 @@ public void testFreeFailedSlots() throws Exception {
         */
        @Test
        public void testCreateAllocatedSlotReport() throws Exception {
+               final JobID jobID = new JobID();
 
-               try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) {
+               try (SlotPoolImpl slotPool = slotPoolBuilder.build(jobID)) {

Review comment:
       Done.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -88,28 +84,21 @@
 /**
  * Tests for the {@link SlotPoolImpl}.
  */
-public class SlotPoolImplTest extends TestLogger {
+public class SlotPoolImplTest extends SlotPoolTestBase {
 
-       private final Time timeout = Time.seconds(10L);
-
-       private JobID jobId;
+       private static final Time TIMEOUT = Time.seconds(10L);

Review comment:
       Removed.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -275,14 +260,14 @@ public void testOfferSlot() throws Exception {
 
                        // we'll also accept non requested slots
                        assertTrue(slotPool.offerSlot(taskManagerLocation, 
taskManagerGateway, nonRequestedSlotOffer));
+                       assertEquals(1, slotPool.getAllocatedSlots().size());
+                       final PhysicalSlot slot = 
future.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+                       assertEquals(taskManagerLocation, 
slot.getTaskManagerLocation());
+                       assertEquals(nonRequestedSlotOffer.getAllocationId(), 
slot.getAllocationId());

Review comment:
       I'll put it into the first commit when I rearrange the commits.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -777,26 +764,15 @@ public void 
testSlotsOfferedWithoutResourceManagerConnected() throws Exception {
                }
        }
 
-       private void requestNewAllocatedSlots(final SlotPool slotPool, final 
SlotRequestId... slotRequestIds) {
-               for (SlotRequestId slotRequestId : slotRequestIds) {
-                       requestNewAllocatedSlot(slotPool, slotRequestId);
-               }
-       }
-
-       private CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
-                       final SlotPool slotPool,
-                       final SlotRequestId slotRequestId) {
-               return slotPool.requestNewAllocatedSlot(slotRequestId, 
ResourceProfile.UNKNOWN, timeout);
-       }
-
        private void offerSlot(final SlotPoolImpl slotPool, final AllocationID 
allocationId) {
                final SlotOffer slotOffer = new SlotOffer(allocationId, 0, 
ResourceProfile.ANY);
-               
slotPool.registerTaskManager(taskManagerLocation.getResourceID());

Review comment:
       I've already brought it back in the fix-up commit.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestBase.java
##########
@@ -0,0 +1,55 @@
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Test base for {@link SlotPool} related test cases.
+ */
+public abstract class SlotPoolTestBase extends TestLogger {
+       protected static final Time TIMEOUT = Time.seconds(10L);
+
+       protected final ComponentMainThreadExecutor mainThreadExecutor =
+               ComponentMainThreadExecutorServiceAdapter.forMainThread();
+
+       protected TestingResourceManagerGateway resourceManagerGateway;
+       protected SlotPoolBuilder slotPoolBuilder;
+
+       @Before
+       public void setup() throws Exception {
+               resourceManagerGateway = new TestingResourceManagerGateway();
+               slotPoolBuilder = new 
SlotPoolBuilder(mainThreadExecutor).setResourceManagerGateway(resourceManagerGateway);
+       }
+
+       protected TestingSlotPoolImpl createAndSetUpSlotPool() throws Exception 
{
+               return slotPoolBuilder.build();
+       }
+
+       protected void requestNewAllocatedSlots(final SlotPool slotPool, final 
SlotRequestId... slotRequestIds) {
+               for (SlotRequestId slotRequestId : slotRequestIds) {
+                       requestNewAllocatedSlot(slotPool, slotRequestId);
+               }
+       }
+
+       protected CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
+               final SlotPool slotPool,
+               final SlotRequestId slotRequestId) {
+               return requestNewAllocatedSlot(slotPool, slotRequestId, 
TIMEOUT);
+       }
+
+       protected CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to