KarmaGYZ commented on a change in pull request #13311: URL: https://github.com/apache/flink/pull/13311#discussion_r488366730
########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java ########## @@ -508,52 +371,103 @@ private void removeContainerRequest(AMRMClient.ContainerRequest pendingContainer return matchingContainerRequests; } - @Override - public void onShutdownRequest() { - onFatalError(new ResourceManagerException(ERROR_MASSAGE_ON_SHUTDOWN_REQUEST)); - } + private ContainerLaunchContext createTaskExecutorLaunchContext( + ResourceID containerId, + String host, + TaskExecutorProcessSpec taskExecutorProcessSpec) throws Exception { - @Override - public void onNodesUpdated(List<NodeReport> list) { - // We are not interested in node updates - } + // init the ContainerLaunchContext + final String currDir = configuration.getCurrentDir(); - @Override - public void onError(Throwable error) { - onFatalError(error); - } + final ContaineredTaskManagerParameters taskManagerParameters = + ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec); - // ------------------------------------------------------------------------ - // NMClientAsync CallbackHandler methods - // ------------------------------------------------------------------------ - @Override - public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> map) { - log.debug("Succeeded to call YARN Node Manager to start container {}.", containerId); - } + log.info("TaskExecutor {} will be started on {} with {}.", + containerId.getStringWithMetadata(), + host, + taskExecutorProcessSpec); - @Override - public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) { - // We are not interested in getting container status + final Configuration taskManagerConfig = BootstrapTools.cloneConfiguration(flinkConfig); + taskManagerConfig.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, containerId.getResourceIdString()); + taskManagerConfig.set(TaskManagerOptionsInternal.TASK_MANAGER_RESOURCE_ID_METADATA, containerId.getMetadata()); + + final String taskManagerDynamicProperties = + BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, taskManagerConfig); + + log.debug("TaskManager configuration: {}", taskManagerConfig); + + final ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext( + flinkConfig, + yarnConfig, + configuration, + taskManagerParameters, + taskManagerDynamicProperties, + currDir, + YarnTaskExecutorRunner.class, + log); + + taskExecutorLaunchContext.getEnvironment() + .put(ENV_FLINK_NODE_ID, host); + return taskExecutorLaunchContext; } - @Override - public void onContainerStopped(ContainerId containerId) { - log.debug("Succeeded to call YARN Node Manager to stop container {}.", containerId); + @VisibleForTesting + Optional<Resource> getContainerResource(TaskExecutorProcessSpec taskExecutorProcessSpec) { + return taskExecutorProcessSpecContainerResourceAdapter.tryComputeContainerResource(taskExecutorProcessSpec); } - @Override - public void onStartContainerError(ContainerId containerId, Throwable t) { - runAsync(() -> releaseFailedContainerAndRequestNewContainerIfRequired(containerId, t)); + private RegisterApplicationMasterResponse registerApplicationMaster() throws Exception { + final int restPort; + final String webInterfaceUrl = configuration.getWebInterfaceUrl(); + final String rpcAddress = configuration.getRpcAddress(); + + if (webInterfaceUrl != null) { + final int lastColon = webInterfaceUrl.lastIndexOf(':'); Review comment: I'm not quite familiar with that logic. I could find the `webInterfaceUrl` is originally derived in `RestServerEndpoint`. So it should always have a port. @tillrohrmann Could you help the ensure this? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org