Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148808482 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -227,14 +241,27 @@ public void startNewWorker(ResourceProfile resourceProfile) { } @Override - public boolean stopWorker(ResourceID resourceID) { - // TODO: Implement to stop the worker - return false; + public boolean stopWorker(YarnWorkerNode workerNode) { + workerNodeMap.remove(workerNode.getResourceID().toString()); + if (workerNode != null) { + Container container = workerNode.getYarnContainer(); + log.info("Stopping container {}.", container.getId().toString()); + // release the container on the node manager + try { + nodeManagerClient.stopContainer(container.getId(), container.getNodeId()); + } catch (Throwable t) { + log.error("Error while calling YARN Node Manager to stop container", t); --- End diff -- this should be a warning
---