[ 
https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15430405#comment-15430405
 ] 

ASF GitHub Bot commented on FLINK-4363:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2400#discussion_r75644911
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
    @@ -35,27 +79,634 @@
      */
     public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
     
    -   /** The unique resource ID of this TaskExecutor */
    +   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
    +
    +   /** Return code for critical errors during the runtime */
    +   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
    +
    +   /** The name of the TaskManager actor */
    +   private static final String TASK_MANAGER_NAME = "taskmanager";
    +
    +   /** The unique resource ID of this TaskManager */
        private final ResourceID resourceID;
     
        /** The access to the leader election and metadata storage services */
        private final HighAvailabilityServices haServices;
     
    -   // --------- resource manager --------
    +   /** The task manager configuration */
    +   private final TaskManagerConfiguration taskManagerConfig;
    +
    +   /** The connection information of the task manager */
    +   private final InstanceConnectionInfo connectionInfo;
    +
    +   /** The I/O manager component in the task manager */
    +   private final IOManager ioManager;
    +
    +   /** The memory manager component in the task manager */
    +   private final MemoryManager memoryManager;
    +
    +   /** The network component in the task manager */
    +   private final NetworkEnvironment networkEnvironment;
    +
    +   /** The number of slots in the task manager, should be 1 for YARN */
    +   private final int numberOfSlots;
     
    +   // --------- resource manager --------
        private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
     
        // 
------------------------------------------------------------------------
     
        public TaskExecutor(
    +                   TaskManagerConfiguration taskManagerConfig,
    +                   ResourceID resourceID,
    +                   InstanceConnectionInfo connectionInfo,
    +                   MemoryManager memoryManager,
    +                   IOManager ioManager,
    +                   NetworkEnvironment networkEnvironment,
    +                   int numberOfSlots,
                        RpcService rpcService,
    -                   HighAvailabilityServices haServices,
    -                   ResourceID resourceID) {
    +                   HighAvailabilityServices haServices) {
     
                super(rpcService);
     
    -           this.haServices = checkNotNull(haServices);
    +           this.taskManagerConfig = checkNotNull(taskManagerConfig);
                this.resourceID = checkNotNull(resourceID);
    +           this.connectionInfo = checkNotNull(connectionInfo);
    +           this.memoryManager = checkNotNull(memoryManager);
    +           this.ioManager = checkNotNull(ioManager);
    +           this.networkEnvironment = checkNotNull(networkEnvironment);
    +           this.numberOfSlots = checkNotNull(numberOfSlots);
    +           this.haServices = checkNotNull(haServices);
    +   }
    +
    +   /**
    +    * Starts and runs the TaskManager.
    +    * <p/>
    +    * This method first tries to select the network interface to use for 
the TaskManager
    +    * communication. The network interface is used both for the actor 
communication
    +    * (coordination) as well as for the data exchange between task 
managers. Unless
    +    * the hostname/interface is explicitly configured in the 
configuration, this
    +    * method will try out various interfaces and methods to connect to the 
JobManager
    +    * and select the one where the connection attempt is successful.
    +    * <p/>
    +    * After selecting the network interface, this method brings up an 
actor system
    +    * for the TaskManager and its actors, starts the TaskManager's services
    +    * (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
    +    *
    +    * @param configuration    The configuration for the TaskManager.
    +    * @param taskManagerClass The actor class to instantiate.
    +    *                         Allows to use TaskManager subclasses for 
example for YARN.
    +    */
    +   public static void selectNetworkInterfaceAndRunTaskManager(
    +           Configuration configuration,
    +           ResourceID resourceID,
    +           Class<? extends TaskManager> taskManagerClass) throws Exception 
{
    +
    +           Tuple2<String, Integer> tuple2 = 
selectNetworkInterfaceAndPort(configuration);
    +
    +           runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
    +   }
    +
    +   private static Tuple2<String, Integer> 
selectNetworkInterfaceAndPort(Configuration configuration)
    +           throws Exception {
    +           String taskManagerHostname = 
configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
    +           if (taskManagerHostname != null) {
    +                   LOG.info("Using configured hostname/address for 
TaskManager: " + taskManagerHostname);
    +           } else {
    +                   LeaderRetrievalService leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
    +                   FiniteDuration lookupTimeout = 
AkkaUtils.getLookupTimeout(configuration);
    +
    +                   InetAddress taskManagerAddress = 
LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, 
lookupTimeout);
    +                   taskManagerHostname = taskManagerAddress.getHostName();
    +                   LOG.info("TaskManager will use hostname/address '{}' 
({}) for communication.",
    +                           taskManagerHostname, 
taskManagerAddress.getHostAddress());
    +           }
    +
    +           // if no task manager port has been configured, use 0 (system 
will pick any free port)
    +           int actorSystemPort = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
    +           if (actorSystemPort < 0 || actorSystemPort > 65535) {
    +                   throw new IllegalConfigurationException("Invalid value 
for '" +
    +                           ConfigConstants.TASK_MANAGER_IPC_PORT_KEY +
    +                           "' (port for the TaskManager actor system) : " 
+ actorSystemPort +
    +                           " - Leave config parameter empty or use 0 to 
let the system choose a port automatically.");
    +           }
    +
    +           return new Tuple2<>(taskManagerHostname, actorSystemPort);
    +   }
    +
    +   /**
    +    * Starts and runs the TaskManager. Brings up an actor system for the 
TaskManager and its
    +    * actors, starts the TaskManager's services (library cache, shuffle 
network stack, ...),
    +    * and starts the TaskManager itself.
    +    * <p/>
    +    * This method will also spawn a process reaper for the TaskManager 
(kill the process if
    +    * the actor fails) and optionally start the JVM memory logging thread.
    +    *
    +    * @param taskManagerHostname The hostname/address of the interface 
where the actor system
    +    *                            will communicate.
    +    * @param resourceID          The id of the resource which the task 
manager will run on.
    +    * @param actorSystemPort     The port at which the actor system will 
communicate.
    +    * @param configuration       The configuration for the TaskManager.
    +    * @param taskManagerClass    The actor class to instantiate. Allows 
the use of TaskManager
    +    *                            subclasses for example for YARN.
    +    */
    +   private static void runTaskManager(
    +           String taskManagerHostname,
    +           ResourceID resourceID,
    +           int actorSystemPort,
    +           Configuration configuration,
    +           Class<? extends TaskManager> taskManagerClass) throws Exception 
{
    +
    +           LOG.info("Starting TaskManager");
    +
    +           // Bring up the TaskManager actor system first, bind it to the 
given address.
    +
    +           LOG.info("Starting TaskManager actor system at " +
    +                   NetUtils.hostAndPortToUrlString(taskManagerHostname, 
actorSystemPort));
    +
    +           ActorSystem taskManagerSystem;
    +           try {
    +                   Tuple2<String, Object> address = new Tuple2<String, 
Object>(taskManagerHostname, actorSystemPort);
    +                   Config akkaConfig = 
AkkaUtils.getAkkaConfig(configuration, new Some<>(address));
    +                   LOG.debug("Using akka configuration\n " + akkaConfig);
    +                   taskManagerSystem = 
AkkaUtils.createActorSystem(akkaConfig);
    +           } catch (Throwable t) {
    +                   if (t instanceof 
org.jboss.netty.channel.ChannelException) {
    +                           Throwable cause = t.getCause();
    +                           if (cause != null && t.getCause() instanceof 
java.net.BindException) {
    +                                   String address = 
NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort);
    +                                   throw new IOException("Unable to bind 
TaskManager actor system to address " +
    +                                           address + " - " + 
cause.getMessage(), t);
    +                           }
    +                   }
    +                   throw new Exception("Could not create TaskManager actor 
system", t);
    +           }
    +
    +           // start all the TaskManager services (network stack,  library 
cache, ...)
    +           // and the TaskManager actor
    +           try {
    +                   LOG.info("Starting TaskManager actor");
    +                   ActorRef taskManagerActor = 
startTaskManagerComponentsAndActor(
    +                           configuration,
    +                           resourceID,
    +                           taskManagerSystem,
    +                           taskManagerHostname,
    +                           TASK_MANAGER_NAME,
    +                           null,
    +                           false,
    +                           taskManagerClass);
    +
    +                   // start a process reaper that watches the JobManager. 
If the TaskManager actor dies,
    +                   // the process reaper will kill the JVM process (to 
ensure easy failure detection)
    +                   LOG.debug("Starting TaskManager process reaper");
    +
    +                   taskManagerSystem.actorOf(
    +                           Props.create(ProcessReaper.class, 
taskManagerActor, LOG, RUNTIME_FAILURE_RETURN_CODE),
    +                           "TaskManager_Process_Reaper");
    +
    +                   // if desired, start the logging daemon that 
periodically logs the memory usage information
    +                   if (LOG.isInfoEnabled() && configuration.getBoolean(
    +                           
ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD,
    +                           
ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD)) {
    +                           LOG.info("Starting periodic memory usage 
logger");
    +
    +                           long interval = configuration.getLong(
    +                                   
ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS,
    +                                   
ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS);
    +
    +                           MemoryLogger logger = new MemoryLogger(LOG, 
interval, taskManagerSystem);
    +                           logger.start();
    +                   }
    +
    +                   // block until everything is done
    +                   taskManagerSystem.awaitTermination();
    +           } catch (Throwable t) {
    +                   LOG.error("Error while starting up taskManager", t);
    +                   try {
    +                           taskManagerSystem.shutdown();
    +                   } catch (Throwable tt) {
    +                           LOG.warn("Could not cleanly shut down actor 
system", tt);
    +                   }
    +                   throw t;
    +           }
    +   }
    +
    +   // 
--------------------------------------------------------------------------
    +   //  Starting and running the TaskManager
    +   // 
--------------------------------------------------------------------------
    +
    +   /**
    +    * @param configuration                 The configuration for the 
TaskManager.
    +    * @param resourceID                    The id of the resource which 
the task manager will run on.
    +    * @param actorSystem                  The actor system that should run 
the TaskManager actor.
    +    * @param taskManagerHostname       The hostname/address that describes 
the TaskManager's data location.
    +    * @param taskManagerActorName      Optionally the name of the 
TaskManager actor. If none is given,
    +    *                                      the actor will use a random 
name.
    +    * @param leaderRetrievalService        Optionally, a leader retrieval 
service can be provided. If none is given,
    +    *                                      then a LeaderRetrievalService 
is constructed from the configuration.
    +    * @param localTaskManagerCommunication If true, the TaskManager will 
not initiate the TCP network stack.
    +    * @param taskManagerClass      The class of the TaskManager actor. May 
be used to give
    +    *                                      subclasses that understand 
additional actor messages.
    +    * @return An ActorRef to the TaskManager actor.
    +    * @throws org.apache.flink.configuration.IllegalConfigurationException 
    Thrown, if the given config contains illegal values.
    +    * @throws java.io.IOException      Thrown, if any of the I/O 
components (such as buffer pools,
    +    *                                       I/O manager, ...) cannot be 
properly started.
    +    * @throws java.lang.Exception      Thrown is some other error occurs 
while parsing the configuration
    +    *                                      or starting the TaskManager 
components.
    +    */
    +   public static ActorRef startTaskManagerComponentsAndActor(
    +           Configuration configuration,
    +           ResourceID resourceID,
    +           ActorSystem actorSystem,
    +           String taskManagerHostname,
    +           String taskManagerActorName,
    +           LeaderRetrievalService leaderRetrievalService,
    +           boolean localTaskManagerCommunication,
    +           Class<? extends TaskManager> taskManagerClass) throws Exception 
{
    +
    +           Tuple4<TaskManagerConfiguration, 
NetworkEnvironmentConfiguration, InstanceConnectionInfo, MemoryType> tuple4
    +                   = parseTaskManagerConfiguration(configuration, 
taskManagerHostname, localTaskManagerCommunication);
    +
    +           TaskManagerConfiguration taskManagerConfig = tuple4._1();
    +           NetworkEnvironmentConfiguration netConfig = tuple4._2();
    +           InstanceConnectionInfo connectionInfo = tuple4._3();
    +           MemoryType memType = tuple4._4();
    +
    +           // pre-start checks
    +           checkTempDirs(taskManagerConfig.tmpDirPaths());
    +
    +           ExecutionContext executionContext = 
ExecutionContexts$.MODULE$.fromExecutor(new ForkJoinPool());
    +
    +           // we start the network first, to make sure it can allocate its 
buffers first
    +           NetworkEnvironment network = new 
NetworkEnvironment(executionContext, taskManagerConfig.timeout(), netConfig);
    +
    +           // computing the amount of memory to use depends on how much 
memory is available
    +           // it strictly needs to happen AFTER the network stack has been 
initialized
    +
    +           // check if a value has been configured
    +           long configuredMemory = 
configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
    +           checkConfigParameter(configuredMemory == -1 || configuredMemory 
> 0, configuredMemory,
    +                   ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
    +                   "MemoryManager needs at least one MB of memory. " +
    +                           "If you leave this config parameter empty, the 
system automatically " +
    +                           "pick a fraction of the available memory.");
    +
    +           long memorySize;
    +           boolean preAllocateMemory = configuration.getBoolean(
    +                   ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY,
    +                   
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE);
    +           if (configuredMemory > 0) {
    +                   if (preAllocateMemory) {
    +                           LOG.info("Using {} MB for managed memory." , 
configuredMemory);
    +                   } else {
    +                           LOG.info("Limiting managed memory to {} MB, 
memory will be allocated lazily." , configuredMemory);
    +                   }
    +                   memorySize = configuredMemory << 20; // megabytes to 
bytes
    +           } else {
    +                   float fraction = configuration.getFloat(
    +                           
ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
    +                           
ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
    +                   checkConfigParameter(fraction > 0.0f && fraction < 
1.0f, fraction,
    +                           
ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
    +                           "MemoryManager fraction of the free memory must 
be between 0.0 and 1.0");
    +
    +                   if (memType == MemoryType.HEAP) {
    +                           long relativeMemSize = (long) 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction);
    +                           if (preAllocateMemory) {
    +                                   LOG.info("Using {} of the currently 
free heap space for managed heap memory ({} MB)." ,
    +                                           fraction , relativeMemSize >> 
20);
    +                           } else {
    +                                   LOG.info("Limiting managed memory to {} 
of the currently free heap space ({} MB), " +
    +                                           "memory will be allocated 
lazily." , fraction , relativeMemSize >> 20);
    +                           }
    +                           memorySize = relativeMemSize;
    +                   } else if (memType == MemoryType.OFF_HEAP) {
    +                           // The maximum heap memory has been adjusted 
according to the fraction
    +                           long maxMemory = 
EnvironmentInformation.getMaxJvmHeapMemory();
    +                           long directMemorySize = (long) (maxMemory / 
(1.0 - fraction) * fraction);
    +                           if (preAllocateMemory) {
    +                                   LOG.info("Using {} of the maximum 
memory size for managed off-heap memory ({} MB)." ,
    +                                           fraction, directMemorySize >> 
20);
    +                           } else {
    +                                   LOG.info("Limiting managed memory to {} 
of the maximum memory size ({} MB)," +
    +                                           " memory will be allocated 
lazily.", fraction, directMemorySize >> 20);
    +                           }
    +                           memorySize = directMemorySize;
    +                   } else {
    +                           throw new RuntimeException("No supported memory 
type detected.");
    +                   }
    +           }
    +
    +           // now start the memory manager
    +           MemoryManager memoryManager;
    +           try {
    +                   memoryManager = new MemoryManager(
    +                           memorySize,
    +                           taskManagerConfig.numberOfSlots(),
    +                           netConfig.networkBufferSize(),
    +                           memType,
    +                           preAllocateMemory);
    +           } catch (OutOfMemoryError e) {
    +                   if (memType == MemoryType.HEAP) {
    +                           throw new Exception("OutOfMemory error (" + 
e.getMessage() +
    +                                   ") while allocating the TaskManager 
heap memory (" + memorySize + " bytes).", e);
    +                   } else if (memType == MemoryType.OFF_HEAP) {
    +                           throw new Exception("OutOfMemory error (" + 
e.getMessage() +
    +                                   ") while allocating the TaskManager 
off-heap memory (" + memorySize +
    +                                   " bytes).Try increasing the maximum 
direct memory (-XX:MaxDirectMemorySize)", e);
    +                   } else {
    +                           throw e;
    +                   }
    +           }
    +
    +           // start the I/O manager, it will create some temp directories.
    +           IOManager ioManager = new 
IOManagerAsync(taskManagerConfig.tmpDirPaths());
    +
    +           if (leaderRetrievalService == null){
    +                   leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
    +           }
    +
    +           // create the actor properties (which define the actor 
constructor parameters)
    +           Props tmProps = Props.create(
    +                   taskManagerClass,
    +                   taskManagerConfig,
    +                   resourceID,
    +                   connectionInfo,
    +                   memoryManager,
    +                   ioManager,
    +                   network,
    +                   taskManagerConfig.numberOfSlots(),
    +                   leaderRetrievalService);
    +
    +           ActorRef taskManagerActorRef;
    +           if (taskManagerActorName != null && 
!taskManagerActorName.equals("")) {
    +                   taskManagerActorRef = actorSystem.actorOf(tmProps, 
taskManagerActorName);
    +           } else {
    +                   taskManagerActorRef = actorSystem.actorOf(tmProps);
    +           }
    +
    +           return taskManagerActorRef;
    +   }
    +
    +   // 
--------------------------------------------------------------------------
    +   //  Parsing and checking the TaskManager Configuration
    +   // 
--------------------------------------------------------------------------
    +
    +   /**
    +    * Utility method to extract TaskManager config parameters from the 
configuration and to
    +    * sanity check them.
    +    *
    +    * @param configuration                 The configuration.
    +    * @param taskManagerHostname           The host name under which the 
TaskManager communicates.
    +    * @param localTaskManagerCommunication True, to skip initializing the 
network stack.
    +    *                                      Use only in cases where only 
one task manager runs.
    +    * @return A tuple (TaskManagerConfiguration, network configuration,
    +    * InstanceConnectionInfo, JobManager actor Akka URL).
    +    */
    +   private static Tuple4<TaskManagerConfiguration, 
NetworkEnvironmentConfiguration, InstanceConnectionInfo, MemoryType>
    +           parseTaskManagerConfiguration(Configuration configuration, 
String taskManagerHostname, boolean localTaskManagerCommunication)
    +           throws Exception {
    +
    +           // ------- read values from the config and check them ---------
    +           //                      (a lot of them)
    +
    +           // ----> hosts / ports for communication and data exchange
    +
    +           int dataport = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
    +                   ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
    +           if (dataport == 0) {
    +                   dataport = NetUtils.getAvailablePort();
    +           }
    +           checkConfigParameter(dataport > 0, dataport, 
ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
    +                   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
    +
    +           InetAddress taskManagerAddress = 
InetAddress.getByName(taskManagerHostname);
    +           InstanceConnectionInfo connectionInfo = new 
InstanceConnectionInfo(taskManagerAddress, dataport);
    +
    +           // ----> memory / network stack (shuffles/broadcasts), task 
slots, temp directories
    +
    +           // we need this because many configs have been written with a 
"-1" entry
    +           int slots = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
    +           if (slots == -1) {
    +                   slots = 1;
    +           }
    +           checkConfigParameter(slots >= 1, slots, 
ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
    +                   "Number of task slots must be at least one.");
    +
    +           int numNetworkBuffers = configuration.getInteger(
    +                   ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
    +                   
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
    +           checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
    +                   ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 
"");
    +
    +           int pageSize = configuration.getInteger(
    +                   ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
    +                   
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE);
    +           // check page size of for minimum size
    +           checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, 
pageSize,
    +                   ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
    +                   "Minimum memory segment size is " + 
MemoryManager.MIN_PAGE_SIZE);
    +           // check page size for power of two
    +           checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
    +                   ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
    +                   "Memory segment size must be a power of 2.");
    +
    +           // check whether we use heap or off-heap memory
    +           MemoryType memType;
    --- End diff --
    
    Could be declared `final` to make assignment explicit.


> Implement TaskManager basic startup of all components in java
> -------------------------------------------------------------
>
>                 Key: FLINK-4363
>                 URL: https://issues.apache.org/jira/browse/FLINK-4363
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Cluster Management
>            Reporter: Zhijiang Wang
>            Assignee: Zhijiang Wang
>
> Similar with current {{TaskManager}},but implement initialization and startup 
> all components in java instead of scala.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to