KarmaGYZ commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r488366730



##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -508,52 +371,103 @@ private void 
removeContainerRequest(AMRMClient.ContainerRequest pendingContainer
                return matchingContainerRequests;
        }
 
-       @Override
-       public void onShutdownRequest() {
-               onFatalError(new 
ResourceManagerException(ERROR_MASSAGE_ON_SHUTDOWN_REQUEST));
-       }
+       private ContainerLaunchContext createTaskExecutorLaunchContext(
+               ResourceID containerId,
+               String host,
+               TaskExecutorProcessSpec taskExecutorProcessSpec) throws 
Exception {
 
-       @Override
-       public void onNodesUpdated(List<NodeReport> list) {
-               // We are not interested in node updates
-       }
+               // init the ContainerLaunchContext
+               final String currDir = configuration.getCurrentDir();
 
-       @Override
-       public void onError(Throwable error) {
-               onFatalError(error);
-       }
+               final ContaineredTaskManagerParameters taskManagerParameters =
+                       ContaineredTaskManagerParameters.create(flinkConfig, 
taskExecutorProcessSpec);
 
-       // 
------------------------------------------------------------------------
-       //  NMClientAsync CallbackHandler methods
-       // 
------------------------------------------------------------------------
-       @Override
-       public void onContainerStarted(ContainerId containerId, Map<String, 
ByteBuffer> map) {
-               log.debug("Succeeded to call YARN Node Manager to start 
container {}.", containerId);
-       }
+               log.info("TaskExecutor {} will be started on {} with {}.",
+                       containerId.getStringWithMetadata(),
+                       host,
+                       taskExecutorProcessSpec);
 
-       @Override
-       public void onContainerStatusReceived(ContainerId containerId, 
ContainerStatus containerStatus) {
-               // We are not interested in getting container status
+               final Configuration taskManagerConfig = 
BootstrapTools.cloneConfiguration(flinkConfig);
+               
taskManagerConfig.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, 
containerId.getResourceIdString());
+               
taskManagerConfig.set(TaskManagerOptionsInternal.TASK_MANAGER_RESOURCE_ID_METADATA,
 containerId.getMetadata());
+
+               final String taskManagerDynamicProperties =
+                       
BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, 
taskManagerConfig);
+
+               log.debug("TaskManager configuration: {}", taskManagerConfig);
+
+               final ContainerLaunchContext taskExecutorLaunchContext = 
Utils.createTaskExecutorContext(
+                       flinkConfig,
+                       yarnConfig,
+                       configuration,
+                       taskManagerParameters,
+                       taskManagerDynamicProperties,
+                       currDir,
+                       YarnTaskExecutorRunner.class,
+                       log);
+
+               taskExecutorLaunchContext.getEnvironment()
+                       .put(ENV_FLINK_NODE_ID, host);
+               return taskExecutorLaunchContext;
        }
 
-       @Override
-       public void onContainerStopped(ContainerId containerId) {
-               log.debug("Succeeded to call YARN Node Manager to stop 
container {}.", containerId);
+       @VisibleForTesting
+       Optional<Resource> getContainerResource(TaskExecutorProcessSpec 
taskExecutorProcessSpec) {
+               return 
taskExecutorProcessSpecContainerResourceAdapter.tryComputeContainerResource(taskExecutorProcessSpec);
        }
 
-       @Override
-       public void onStartContainerError(ContainerId containerId, Throwable t) 
{
-               runAsync(() -> 
releaseFailedContainerAndRequestNewContainerIfRequired(containerId, t));
+       private RegisterApplicationMasterResponse registerApplicationMaster() 
throws Exception {
+               final int restPort;
+               final String webInterfaceUrl = 
configuration.getWebInterfaceUrl();
+               final String rpcAddress = configuration.getRpcAddress();
+
+               if (webInterfaceUrl != null) {
+                       final int lastColon = webInterfaceUrl.lastIndexOf(':');

Review comment:
       I'm not quite familiar with that logic. I could find the 
`webInterfaceUrl` is originally derived in `RestServerEndpoint`. So it should 
always have a port.
   
   @tillrohrmann Could you help the ensure this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to