[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236181#comment-16236181 ]
ASF GitHub Bot commented on FLINK-6434: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148570875 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + // 1. test the pending request is in waitingResourceManagerRequests + AllocationID allocationID = new AllocationID(); + CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfWaitingForResourceRequests()); + + pool.cancelSlotAllocation(allocationID); + assertEquals(0, pool.getNumOfWaitingForResourceRequests()); + + // 2. test the pending request is in pendingRequests + ResourceManagerGateway resourceManagerGateway = SlotPoolTest.createResourceManagerGatewayMock(); + pool.connectToResourceManager(resourceManagerGateway); + + AllocationID allocationID2 = new AllocationID(); + future = slotPoolGateway.allocateSlot(allocationID2, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfPendingRequests()); + + pool.cancelSlotAllocation(allocationID2); + assertEquals(0, pool.getNumOfPendingRequests()); + //verify(resourceManagerGateway, times(1)).cancelSlotRequest(jid, any(JobMasterId.class), allocationID2); + + // 3. test the allocation is timed out in client side but the request is fulfilled in slot pool + AllocationID allocationID3 = new AllocationID(); + future = slotPoolGateway.allocateSlot(allocationID3, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + ResourceID resourceID = ResourceID.generate(); + AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, allocationID3, jid, DEFAULT_TESTING_PROFILE); + slotPoolGateway.registerTaskManager(resourceID); + assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); + + assertEquals(0, pool.getNumOfPendingRequests()); + assertTrue(pool.getAllocatedSlots().contains(allocationID3)); + + pool.cancelSlotAllocation(allocationID3); + assertFalse(pool.getAllocatedSlots().contains(allocationID3)); + assertTrue(pool.getAvailableSlots().contains(allocationID3)); + } + + @Test + public void testProviderAndOwner() throws Exception { --- End diff -- What does this test tests? Maybe add a short description. > There may be allocatedSlots leak in SlotPool > -------------------------------------------- > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management > Reporter: shuai.xu > Assignee: shuai.xu > Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)