Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5931#discussion_r184831311 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -293,21 +293,16 @@ public void startNewWorker(ResourceProfile resourceProfile) { } @Override - public boolean stopWorker(YarnWorkerNode workerNode) { - if (workerNode != null) { - Container container = workerNode.getContainer(); - log.info("Stopping container {}.", container.getId()); - // release the container on the node manager - try { - nodeManagerClient.stopContainer(container.getId(), container.getNodeId()); - } catch (Throwable t) { - log.warn("Error while calling YARN Node Manager to stop container", t); - } - resourceManagerClient.releaseAssignedContainer(container.getId()); - workerNodeMap.remove(workerNode.getResourceID()); - } else { - log.error("Can not find container for null workerNode."); + public boolean stopWorker(final YarnWorkerNode workerNode) { + final Container container = workerNode.getContainer(); + log.info("Stopping container {}.", container.getId()); + try { + nodeManagerClient.stopContainer(container.getId(), container.getNodeId()); + } catch (final Exception e) { + log.warn("Error while calling YARN Node Manager to stop container", e); --- End diff -- Previous version was the `Throwable` here, currently changed to `Exception`, what the reason here? Beside, if we change here from `Throwable` -> `Exception` then maybe we should also change the other places where have a similar operation with `nodeManagerClient` like here, e.g. ![image](https://user-images.githubusercontent.com/7480427/39389518-e609686c-4abb-11e8-99df-b0cd013a58ef.png)
---