[ 
https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15430431#comment-15430431
 ] 

ASF GitHub Bot commented on FLINK-4363:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2400#discussion_r75646889
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
    @@ -35,27 +79,634 @@
      */
     public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
     
    -   /** The unique resource ID of this TaskExecutor */
    +   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
    +
    +   /** Return code for critical errors during the runtime */
    +   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
    +
    +   /** The name of the TaskManager actor */
    +   private static final String TASK_MANAGER_NAME = "taskmanager";
    +
    +   /** The unique resource ID of this TaskManager */
        private final ResourceID resourceID;
     
        /** The access to the leader election and metadata storage services */
        private final HighAvailabilityServices haServices;
     
    -   // --------- resource manager --------
    +   /** The task manager configuration */
    +   private final TaskManagerConfiguration taskManagerConfig;
    +
    +   /** The connection information of the task manager */
    +   private final InstanceConnectionInfo connectionInfo;
    +
    +   /** The I/O manager component in the task manager */
    +   private final IOManager ioManager;
    +
    +   /** The memory manager component in the task manager */
    +   private final MemoryManager memoryManager;
    +
    +   /** The network component in the task manager */
    +   private final NetworkEnvironment networkEnvironment;
    +
    +   /** The number of slots in the task manager, should be 1 for YARN */
    +   private final int numberOfSlots;
     
    +   // --------- resource manager --------
        private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
     
        // 
------------------------------------------------------------------------
     
        public TaskExecutor(
    +                   TaskManagerConfiguration taskManagerConfig,
    +                   ResourceID resourceID,
    +                   InstanceConnectionInfo connectionInfo,
    +                   MemoryManager memoryManager,
    +                   IOManager ioManager,
    +                   NetworkEnvironment networkEnvironment,
    +                   int numberOfSlots,
                        RpcService rpcService,
    -                   HighAvailabilityServices haServices,
    -                   ResourceID resourceID) {
    +                   HighAvailabilityServices haServices) {
     
                super(rpcService);
     
    -           this.haServices = checkNotNull(haServices);
    +           this.taskManagerConfig = checkNotNull(taskManagerConfig);
                this.resourceID = checkNotNull(resourceID);
    +           this.connectionInfo = checkNotNull(connectionInfo);
    +           this.memoryManager = checkNotNull(memoryManager);
    +           this.ioManager = checkNotNull(ioManager);
    +           this.networkEnvironment = checkNotNull(networkEnvironment);
    +           this.numberOfSlots = checkNotNull(numberOfSlots);
    +           this.haServices = checkNotNull(haServices);
    +   }
    +
    +   /**
    +    * Starts and runs the TaskManager.
    +    * <p/>
    +    * This method first tries to select the network interface to use for 
the TaskManager
    +    * communication. The network interface is used both for the actor 
communication
    +    * (coordination) as well as for the data exchange between task 
managers. Unless
    +    * the hostname/interface is explicitly configured in the 
configuration, this
    +    * method will try out various interfaces and methods to connect to the 
JobManager
    +    * and select the one where the connection attempt is successful.
    +    * <p/>
    +    * After selecting the network interface, this method brings up an 
actor system
    +    * for the TaskManager and its actors, starts the TaskManager's services
    +    * (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
    +    *
    +    * @param configuration    The configuration for the TaskManager.
    +    * @param taskManagerClass The actor class to instantiate.
    +    *                         Allows to use TaskManager subclasses for 
example for YARN.
    +    */
    +   public static void selectNetworkInterfaceAndRunTaskManager(
    +           Configuration configuration,
    +           ResourceID resourceID,
    +           Class<? extends TaskManager> taskManagerClass) throws Exception 
{
    +
    +           Tuple2<String, Integer> tuple2 = 
selectNetworkInterfaceAndPort(configuration);
    +
    +           runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
    +   }
    +
    +   private static Tuple2<String, Integer> 
selectNetworkInterfaceAndPort(Configuration configuration)
    +           throws Exception {
    +           String taskManagerHostname = 
configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
    +           if (taskManagerHostname != null) {
    +                   LOG.info("Using configured hostname/address for 
TaskManager: " + taskManagerHostname);
    +           } else {
    +                   LeaderRetrievalService leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
    +                   FiniteDuration lookupTimeout = 
AkkaUtils.getLookupTimeout(configuration);
    +
    +                   InetAddress taskManagerAddress = 
LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, 
lookupTimeout);
    +                   taskManagerHostname = taskManagerAddress.getHostName();
    +                   LOG.info("TaskManager will use hostname/address '{}' 
({}) for communication.",
    +                           taskManagerHostname, 
taskManagerAddress.getHostAddress());
    +           }
    +
    +           // if no task manager port has been configured, use 0 (system 
will pick any free port)
    +           int actorSystemPort = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
    +           if (actorSystemPort < 0 || actorSystemPort > 65535) {
    +                   throw new IllegalConfigurationException("Invalid value 
for '" +
    +                           ConfigConstants.TASK_MANAGER_IPC_PORT_KEY +
    +                           "' (port for the TaskManager actor system) : " 
+ actorSystemPort +
    +                           " - Leave config parameter empty or use 0 to 
let the system choose a port automatically.");
    +           }
    +
    +           return new Tuple2<>(taskManagerHostname, actorSystemPort);
    +   }
    +
    +   /**
    +    * Starts and runs the TaskManager. Brings up an actor system for the 
TaskManager and its
    +    * actors, starts the TaskManager's services (library cache, shuffle 
network stack, ...),
    +    * and starts the TaskManager itself.
    +    * <p/>
    +    * This method will also spawn a process reaper for the TaskManager 
(kill the process if
    +    * the actor fails) and optionally start the JVM memory logging thread.
    +    *
    +    * @param taskManagerHostname The hostname/address of the interface 
where the actor system
    +    *                            will communicate.
    +    * @param resourceID          The id of the resource which the task 
manager will run on.
    +    * @param actorSystemPort     The port at which the actor system will 
communicate.
    +    * @param configuration       The configuration for the TaskManager.
    +    * @param taskManagerClass    The actor class to instantiate. Allows 
the use of TaskManager
    +    *                            subclasses for example for YARN.
    +    */
    +   private static void runTaskManager(
    +           String taskManagerHostname,
    +           ResourceID resourceID,
    +           int actorSystemPort,
    +           Configuration configuration,
    +           Class<? extends TaskManager> taskManagerClass) throws Exception 
{
    +
    +           LOG.info("Starting TaskManager");
    +
    +           // Bring up the TaskManager actor system first, bind it to the 
given address.
    +
    +           LOG.info("Starting TaskManager actor system at " +
    +                   NetUtils.hostAndPortToUrlString(taskManagerHostname, 
actorSystemPort));
    +
    +           ActorSystem taskManagerSystem;
    --- End diff --
    
    Would make this final.


> Implement TaskManager basic startup of all components in java
> -------------------------------------------------------------
>
>                 Key: FLINK-4363
>                 URL: https://issues.apache.org/jira/browse/FLINK-4363
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Cluster Management
>            Reporter: Zhijiang Wang
>            Assignee: Zhijiang Wang
>
> Similar with current {{TaskManager}},but implement initialization and startup 
> all components in java instead of scala.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to