[ https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15430423#comment-15430423 ]
ASF GitHub Bot commented on FLINK-4363: --------------------------------------- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75646080 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // --------- resource manager -------- + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // --------- resource manager -------- private TaskExecutorToResourceManagerConnection resourceManagerConnection; // ------------------------------------------------------------------------ public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** + * Starts and runs the TaskManager. + * <p/> + * This method first tries to select the network interface to use for the TaskManager + * communication. The network interface is used both for the actor communication + * (coordination) as well as for the data exchange between task managers. Unless + * the hostname/interface is explicitly configured in the configuration, this + * method will try out various interfaces and methods to connect to the JobManager + * and select the one where the connection attempt is successful. + * <p/> + * After selecting the network interface, this method brings up an actor system + * for the TaskManager and its actors, starts the TaskManager's services + * (library cache, shuffle network stack, ...), and starts the TaskManager itself. + * + * @param configuration The configuration for the TaskManager. + * @param taskManagerClass The actor class to instantiate. + * Allows to use TaskManager subclasses for example for YARN. + */ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class<? extends TaskManager> taskManagerClass) throws Exception { + + Tuple2<String, Integer> tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2<String, Integer> selectNetworkInterfaceAndPort(Configuration configuration) --- End diff -- Do we want to rely on Scala tuples here after the conversion from Scala to Java? I would suggest `InetAddres` as a return type here. > Implement TaskManager basic startup of all components in java > ------------------------------------------------------------- > > Key: FLINK-4363 > URL: https://issues.apache.org/jira/browse/FLINK-4363 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management > Reporter: Zhijiang Wang > Assignee: Zhijiang Wang > > Similar with current {{TaskManager}},but implement initialization and startup > all components in java instead of scala. -- This message was sent by Atlassian JIRA (v6.3.4#6332)