Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148571514 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + // 1. test the pending request is in waitingResourceManagerRequests + AllocationID allocationID = new AllocationID(); + CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfWaitingForResourceRequests()); + + pool.cancelSlotAllocation(allocationID); + assertEquals(0, pool.getNumOfWaitingForResourceRequests()); + + // 2. test the pending request is in pendingRequests + ResourceManagerGateway resourceManagerGateway = SlotPoolTest.createResourceManagerGatewayMock(); + pool.connectToResourceManager(resourceManagerGateway); + + AllocationID allocationID2 = new AllocationID(); + future = slotPoolGateway.allocateSlot(allocationID2, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfPendingRequests()); + + pool.cancelSlotAllocation(allocationID2); + assertEquals(0, pool.getNumOfPendingRequests()); + //verify(resourceManagerGateway, times(1)).cancelSlotRequest(jid, any(JobMasterId.class), allocationID2); + + // 3. test the allocation is timed out in client side but the request is fulfilled in slot pool + AllocationID allocationID3 = new AllocationID(); + future = slotPoolGateway.allocateSlot(allocationID3, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + ResourceID resourceID = ResourceID.generate(); + AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, allocationID3, jid, DEFAULT_TESTING_PROFILE); + slotPoolGateway.registerTaskManager(resourceID); + assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); + + assertEquals(0, pool.getNumOfPendingRequests()); + assertTrue(pool.getAllocatedSlots().contains(allocationID3)); + + pool.cancelSlotAllocation(allocationID3); + assertFalse(pool.getAllocatedSlots().contains(allocationID3)); + assertTrue(pool.getAvailableSlots().contains(allocationID3)); + } + + @Test + public void testProviderAndOwner() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.seconds(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + ResourceManagerGateway resourceManagerGateway = SlotPoolTest.createResourceManagerGatewayMock(); + pool.connectToResourceManager(resourceManagerGateway); + + ScheduledUnit mockScheduledUnit = mock(ScheduledUnit.class); + Execution mockExecution = mock(Execution.class); + ExecutionVertex mockExecutionVertex = mock(ExecutionVertex.class); + when(mockScheduledUnit.getTaskToExecute()).thenReturn(mockExecution); + when(mockExecution.getVertex()).thenReturn(mockExecutionVertex); + when(mockExecutionVertex.getPreferredLocations()).thenReturn(null); + + // test the pending request is clear when timed out + CompletableFuture<SimpleSlot> future = pool.getSlotProvider().allocateSlot(mockScheduledUnit, true); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + // wait for the async call to execute + Thread.sleep(1000); --- End diff -- Enforcing certain interleavings with `Thread.sleep` is bound to fail eventually. Please do it differently.
---