Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4937#discussion_r148570875
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager()
throws Exception {
fail("wrong exception: " + e);
}
}
+
+ @Test
+ public void testCancelSlotAllocation() throws Exception {
+ final JobID jid = new JobID();
+
+ final SlotPool pool = new SlotPool(
+ rpcService, jid,
+ SystemClock.getInstance(),
+ Time.days(1), Time.days(1),
+ Time.seconds(3) // this is the timeout for the
request tested here
+ );
+ pool.start(JobMasterId.generate(), "foobar");
+ SlotPoolGateway slotPoolGateway =
pool.getSelfGateway(SlotPoolGateway.class);
+
+ // 1. test the pending request is in
waitingResourceManagerRequests
+ AllocationID allocationID = new AllocationID();
+ CompletableFuture<SimpleSlot> future =
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null,
Time.seconds(1));
+
+ try {
+ future.get(2, TimeUnit.SECONDS);
+ fail("We expected a AskTimeoutException.");
+ }
+ catch (ExecutionException e) {
+ assertEquals(AskTimeoutException.class,
e.getCause().getClass());
+ }
+ catch (Exception e) {
+ fail("wrong exception: " + e);
+ }
+
+ assertEquals(1, pool.getNumOfWaitingForResourceRequests());
+
+ pool.cancelSlotAllocation(allocationID);
+ assertEquals(0, pool.getNumOfWaitingForResourceRequests());
+
+ // 2. test the pending request is in pendingRequests
+ ResourceManagerGateway resourceManagerGateway =
SlotPoolTest.createResourceManagerGatewayMock();
+ pool.connectToResourceManager(resourceManagerGateway);
+
+ AllocationID allocationID2 = new AllocationID();
+ future = slotPoolGateway.allocateSlot(allocationID2,
DEFAULT_TESTING_PROFILE, null, Time.seconds(1));
+
+ try {
+ future.get(2, TimeUnit.SECONDS);
+ fail("We expected a AskTimeoutException.");
+ }
+ catch (ExecutionException e) {
+ assertEquals(AskTimeoutException.class,
e.getCause().getClass());
+ }
+ catch (Exception e) {
+ fail("wrong exception: " + e);
+ }
+
+ assertEquals(1, pool.getNumOfPendingRequests());
+
+ pool.cancelSlotAllocation(allocationID2);
+ assertEquals(0, pool.getNumOfPendingRequests());
+ //verify(resourceManagerGateway,
times(1)).cancelSlotRequest(jid, any(JobMasterId.class), allocationID2);
+
+ // 3. test the allocation is timed out in client side but the
request is fulfilled in slot pool
+ AllocationID allocationID3 = new AllocationID();
+ future = slotPoolGateway.allocateSlot(allocationID3,
DEFAULT_TESTING_PROFILE, null, Time.seconds(1));
+
+ try {
+ future.get(2, TimeUnit.SECONDS);
+ fail("We expected a AskTimeoutException.");
+ }
+ catch (ExecutionException e) {
+ assertEquals(AskTimeoutException.class,
e.getCause().getClass());
+ }
+ catch (Exception e) {
+ fail("wrong exception: " + e);
+ }
+
+ ResourceID resourceID = ResourceID.generate();
+ AllocatedSlot allocatedSlot =
SlotPoolTest.createAllocatedSlot(resourceID, allocationID3, jid,
DEFAULT_TESTING_PROFILE);
+ slotPoolGateway.registerTaskManager(resourceID);
+ assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+
+ assertEquals(0, pool.getNumOfPendingRequests());
+ assertTrue(pool.getAllocatedSlots().contains(allocationID3));
+
+ pool.cancelSlotAllocation(allocationID3);
+ assertFalse(pool.getAllocatedSlots().contains(allocationID3));
+ assertTrue(pool.getAvailableSlots().contains(allocationID3));
+ }
+
+ @Test
+ public void testProviderAndOwner() throws Exception {
--- End diff --
What does this test tests? Maybe add a short description.
---