[ https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15430431#comment-15430431 ]
ASF GitHub Bot commented on FLINK-4363: --------------------------------------- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75646889 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // --------- resource manager -------- + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // --------- resource manager -------- private TaskExecutorToResourceManagerConnection resourceManagerConnection; // ------------------------------------------------------------------------ public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** + * Starts and runs the TaskManager. + * <p/> + * This method first tries to select the network interface to use for the TaskManager + * communication. The network interface is used both for the actor communication + * (coordination) as well as for the data exchange between task managers. Unless + * the hostname/interface is explicitly configured in the configuration, this + * method will try out various interfaces and methods to connect to the JobManager + * and select the one where the connection attempt is successful. + * <p/> + * After selecting the network interface, this method brings up an actor system + * for the TaskManager and its actors, starts the TaskManager's services + * (library cache, shuffle network stack, ...), and starts the TaskManager itself. + * + * @param configuration The configuration for the TaskManager. + * @param taskManagerClass The actor class to instantiate. + * Allows to use TaskManager subclasses for example for YARN. + */ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class<? extends TaskManager> taskManagerClass) throws Exception { + + Tuple2<String, Integer> tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2<String, Integer> selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { + String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null); + if (taskManagerHostname != null) { + LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname); + } else { + LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration); + FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration); + + InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, lookupTimeout); + taskManagerHostname = taskManagerAddress.getHostName(); + LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.", + taskManagerHostname, taskManagerAddress.getHostAddress()); + } + + // if no task manager port has been configured, use 0 (system will pick any free port) + int actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0); + if (actorSystemPort < 0 || actorSystemPort > 65535) { + throw new IllegalConfigurationException("Invalid value for '" + + ConfigConstants.TASK_MANAGER_IPC_PORT_KEY + + "' (port for the TaskManager actor system) : " + actorSystemPort + + " - Leave config parameter empty or use 0 to let the system choose a port automatically."); + } + + return new Tuple2<>(taskManagerHostname, actorSystemPort); + } + + /** + * Starts and runs the TaskManager. Brings up an actor system for the TaskManager and its + * actors, starts the TaskManager's services (library cache, shuffle network stack, ...), + * and starts the TaskManager itself. + * <p/> + * This method will also spawn a process reaper for the TaskManager (kill the process if + * the actor fails) and optionally start the JVM memory logging thread. + * + * @param taskManagerHostname The hostname/address of the interface where the actor system + * will communicate. + * @param resourceID The id of the resource which the task manager will run on. + * @param actorSystemPort The port at which the actor system will communicate. + * @param configuration The configuration for the TaskManager. + * @param taskManagerClass The actor class to instantiate. Allows the use of TaskManager + * subclasses for example for YARN. + */ + private static void runTaskManager( + String taskManagerHostname, + ResourceID resourceID, + int actorSystemPort, + Configuration configuration, + Class<? extends TaskManager> taskManagerClass) throws Exception { + + LOG.info("Starting TaskManager"); + + // Bring up the TaskManager actor system first, bind it to the given address. + + LOG.info("Starting TaskManager actor system at " + + NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort)); + + ActorSystem taskManagerSystem; --- End diff -- Would make this final. > Implement TaskManager basic startup of all components in java > ------------------------------------------------------------- > > Key: FLINK-4363 > URL: https://issues.apache.org/jira/browse/FLINK-4363 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management > Reporter: Zhijiang Wang > Assignee: Zhijiang Wang > > Similar with current {{TaskManager}},but implement initialization and startup > all components in java instead of scala. -- This message was sent by Atlassian JIRA (v6.3.4#6332)