[ https://issues.apache.org/jira/browse/HIVE-2420?focusedWorklogId=590059&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-590059 ]
ASF GitHub Bot logged work on HIVE-2420: ---------------------------------------- Author: ASF GitHub Bot Created on: 28/Apr/21 00:07 Start Date: 28/Apr/21 00:07 Worklog Time Spent: 10m Work Description: Dawn2111 commented on a change in pull request #2065: URL: https://github.com/apache/hive/pull/2065#discussion_r621674045 ########## File path: ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java ########## @@ -1110,6 +1110,94 @@ public void testMoveSessionsMultiPool() throws Exception { assertFalse(allSessionProviders.get("A").getSessions().contains(sessionA1)); } + @Test(timeout=10000) + public void testDelayedMoveSessions() throws Exception { + final HiveConf conf = createConfForDelayedMove(); + MockQam qam = new MockQam(); + WMFullResourcePlan plan = new WMFullResourcePlan(plan(), Lists.newArrayList( + pool("A", 2, 0.6f), pool("B", 1, 0.4f))); + plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B"))); + final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); + wm.start(); + + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf); + + // [A: 1, B: 0] + Map<String, SessionTriggerProvider> allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(1, allSessionProviders.get("A").getSessions().size()); + assertEquals(0, allSessionProviders.get("B").getSessions().size()); + assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA1)); + assertFalse(allSessionProviders.get("B").getSessions().contains(sessionA1)); + assertEquals(0.6f, sessionA1.getClusterFraction(), EPSILON); + assertEquals("A", sessionA1.getPoolName()); + + // If dest pool has capacity, move immediately + // [A: 0, B: 1] + Future<Boolean> future = wm.applyMoveSessionAsync(sessionA1, "B"); + assertNotNull(future.get()); + assertTrue(future.get()); + wm.addTestEvent().get(); + allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(0, allSessionProviders.get("A").getSessions().size()); + assertEquals(1, allSessionProviders.get("B").getSessions().size()); + assertFalse(allSessionProviders.get("A").getSessions().contains(sessionA1)); + assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1)); + assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON); + assertEquals("B", sessionA1.getPoolName()); + + WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf); + // [A: 1, B: 1] + allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(1, allSessionProviders.get("A").getSessions().size()); + assertEquals(1, allSessionProviders.get("B").getSessions().size()); + assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA2)); + assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1)); + assertEquals(0.6f, sessionA2.getClusterFraction(), EPSILON); + assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON); + assertEquals("A", sessionA2.getPoolName()); + assertEquals("B", sessionA1.getPoolName()); + + // Dest pool is maxed out. Keep running in source pool + // [A: 1, B: 1] + future = wm.applyMoveSessionAsync(sessionA2, "B"); + assertNotNull(future.get()); + assertFalse(future.get()); + wm.addTestEvent().get(); + allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(1, allSessionProviders.get("A").getSessions().size()); + assertEquals(1, allSessionProviders.get("B").getSessions().size()); + assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA2)); + assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1)); + assertEquals(0.6f, sessionA2.getClusterFraction(), EPSILON); + assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON); + assertEquals("A", sessionA2.getPoolName()); + assertEquals("B", sessionA1.getPoolName()); + + // A has queued requests. The new requests should get accepted. The delayed move should be killed + WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf); + WmTezSession sessionA4 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf); + + while(sessionA2.isOpen()) { + Thread.sleep(100); + } + assertNull(sessionA2.getPoolName()); + assertEquals("Destination pool B is full. Killing query.", sessionA2.getReasonForKill()); + + // [A: 2, B: 1] + allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(2, allSessionProviders.get("A").getSessions().size()); + assertEquals(1, allSessionProviders.get("B").getSessions().size()); + assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA3)); + assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA4)); + assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1)); + assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON); + assertEquals(0.3f, sessionA4.getClusterFraction(), EPSILON); + assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON); + assertEquals("A", sessionA3.getPoolName()); + assertEquals("A", sessionA4.getPoolName()); + assertEquals("B", sessionA1.getPoolName()); Review comment: Added ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java ########## @@ -47,8 +47,10 @@ public void applyAction(final Map<WmTezSession, Trigger> queriesViolated) { break; case MOVE_TO_POOL: String destPoolName = entry.getValue().getAction().getPoolName(); - Future<Boolean> moveFuture = wm.applyMoveSessionAsync(wmTezSession, destPoolName); - moveFutures.put(wmTezSession, moveFuture); + if (!wmTezSession.isDelayedMove()) { Review comment: The trigger validator thread iterates through all the sessions and checks for trigger violations at specified intervals. Even for a move trigger without delayed move config set, every 500 ms or so, the trigger would be validated against all the sessions. But, yes, if delayed move config is set, the session would be found as violated initially and then marked as no-op when the action for the trigger is invoked. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java ########## @@ -47,8 +47,10 @@ public void applyAction(final Map<WmTezSession, Trigger> queriesViolated) { break; case MOVE_TO_POOL: String destPoolName = entry.getValue().getAction().getPoolName(); - Future<Boolean> moveFuture = wm.applyMoveSessionAsync(wmTezSession, destPoolName); - moveFutures.put(wmTezSession, moveFuture); + if (!wmTezSession.isDelayedMove()) { Review comment: The trigger validator thread anyways iterates through all the sessions and checks for trigger violations at specified intervals. Even for a move trigger without delayed move config set, every 500 ms or so, the trigger would be validated against all the sessions. But, yes, if delayed move config is set, the session would be found as violated initially and then marked as no-op when the action for the trigger is invoked. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java ########## @@ -790,45 +842,72 @@ private void dumpPoolState(PoolState ps, List<String> set) { } } - private void handleMoveSessionOnMasterThread(final MoveSession moveSession, - final WmThreadSyncWork syncWork, - final HashSet<String> poolsToRedistribute, - final Map<WmTezSession, GetRequest> toReuse, - final Map<WmTezSession, WmEvent> recordMoveEvents) { + private static enum MoveSessionResult { + OK, // Normal case - the session was moved. + KILLED, // Killed because destination pool was full and delayed move is false. + CONVERTED_TO_DELAYED_MOVE, // the move session was added to the pool's delayed moves as the dest. pool was full + // and delayed move is true. + ERROR + } + + private MoveSessionResult handleMoveSessionOnMasterThread(final MoveSession moveSession, + final WmThreadSyncWork syncWork, + final HashSet<String> poolsToRedistribute, + final Map<WmTezSession, GetRequest> toReuse, + final Map<WmTezSession, WmEvent> recordMoveEvents, + final boolean convertToDelayedMove) { String destPoolName = moveSession.destPool; - LOG.info("Handling move session event: {}", moveSession); + LOG.info("Handling move session event: {}, Convert to Delayed Move: {}", moveSession, convertToDelayedMove); if (validMove(moveSession.srcSession, destPoolName)) { + String srcPoolName = moveSession.srcSession.getPoolName(); + PoolState srcPool = pools.get(srcPoolName); + boolean capacityAvailableInDest = capacityAvailable(destPoolName); + // If delayed move is set to true and if destination pool doesn't have enough capacity, don't kill the query. + // Let the query run in source pool. Add the session to the source pool's delayed move sessions. + if (convertToDelayedMove && !capacityAvailableInDest) { + srcPool.delayedMoveSessions.add(moveSession); + moveSession.srcSession.setDelayedMove(true); Review comment: When a pool is updated or destroyed as a consequence of disabling WLM , all the sessions in the pool are removed and we remove the delayed move sessions at that time too - extractAllSessionsToKill(). ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java ########## @@ -790,45 +842,72 @@ private void dumpPoolState(PoolState ps, List<String> set) { } } - private void handleMoveSessionOnMasterThread(final MoveSession moveSession, - final WmThreadSyncWork syncWork, - final HashSet<String> poolsToRedistribute, - final Map<WmTezSession, GetRequest> toReuse, - final Map<WmTezSession, WmEvent> recordMoveEvents) { + private static enum MoveSessionResult { + OK, // Normal case - the session was moved. + KILLED, // Killed because destination pool was full and delayed move is false. + CONVERTED_TO_DELAYED_MOVE, // the move session was added to the pool's delayed moves as the dest. pool was full + // and delayed move is true. + ERROR + } + + private MoveSessionResult handleMoveSessionOnMasterThread(final MoveSession moveSession, + final WmThreadSyncWork syncWork, + final HashSet<String> poolsToRedistribute, + final Map<WmTezSession, GetRequest> toReuse, + final Map<WmTezSession, WmEvent> recordMoveEvents, + final boolean convertToDelayedMove) { String destPoolName = moveSession.destPool; - LOG.info("Handling move session event: {}", moveSession); + LOG.info("Handling move session event: {}, Convert to Delayed Move: {}", moveSession, convertToDelayedMove); if (validMove(moveSession.srcSession, destPoolName)) { + String srcPoolName = moveSession.srcSession.getPoolName(); + PoolState srcPool = pools.get(srcPoolName); + boolean capacityAvailableInDest = capacityAvailable(destPoolName); + // If delayed move is set to true and if destination pool doesn't have enough capacity, don't kill the query. + // Let the query run in source pool. Add the session to the source pool's delayed move sessions. + if (convertToDelayedMove && !capacityAvailableInDest) { + srcPool.delayedMoveSessions.add(moveSession); + moveSession.srcSession.setDelayedMove(true); Review comment: When a pool is updated or destroyed as a consequence of disabling WLM , all the sessions in the pool are removed and we remove the delayed move sessions at that time too - PoolState.extractAllSessionsToKill(). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 590059) Time Spent: 20m (was: 10m) > partition pruner expr is not populated due to some bug in ppd > ------------------------------------------------------------- > > Key: HIVE-2420 > URL: https://issues.apache.org/jira/browse/HIVE-2420 > Project: Hive > Issue Type: Bug > Reporter: He Yongqiang > Assignee: He Yongqiang > Priority: Major > Labels: pull-request-available > Attachments: HIVE-2420.reproduce.diff > > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)