[ 
https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458199#comment-15458199
 ] 

ASF GitHub Bot commented on FLINK-4505:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2461#discussion_r77327741
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/YarnTaskExecutorFactory.java
 ---
    @@ -0,0 +1,198 @@
    +package org.apache.flink.runtime.taskexecutor;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.IllegalConfigurationException;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
    +import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
    +import org.apache.flink.runtime.taskmanager.MemoryLogger;
    +import org.apache.flink.runtime.util.LeaderRetrievalUtils;
    +import org.apache.flink.util.NetUtils;
    +
    +import akka.actor.ActorSystem;
    +import akka.util.Timeout;
    +
    +import scala.Some;
    +import scala.Tuple2;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import com.typesafe.config.Config;
    +
    +import java.io.IOException;
    +import java.net.InetAddress;
    +import java.net.InetSocketAddress;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * An factory for creating {@link TaskExecutor} and starting it in yarn 
mode.
    + */
    +public class YarnTaskExecutorFactory extends TaskExecutorFactory {
    +
    +   public YarnTaskExecutorFactory(Configuration configuration, ResourceID 
resourceID) {
    +           super(configuration, resourceID);
    +   }
    +
    +   @Override
    +   public TaskExecutor createAndStartTaskExecutor() throws Exception {
    +           return selectNetworkInterfaceAndRunTaskManager(configuration, 
resourceID);
    +   }
    +
    +   /**
    +    * Starts and runs the TaskManager.
    +    * <p/>
    +    * This method first tries to select the network interface to use for 
the TaskManager
    +    * communication. The network interface is used both for the actor 
communication
    +    * (coordination) as well as for the data exchange between task 
managers. Unless
    +    * the hostname/interface is explicitly configured in the 
configuration, this
    +    * method will try out various interfaces and methods to connect to the 
JobManager
    +    * and select the one where the connection attempt is successful.
    +    * <p/>
    +    * After selecting the network interface, this method brings up an 
actor system
    +    * for the TaskManager and its actors, starts the TaskManager's services
    +    * (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
    +    *
    +    * @param configuration    The configuration for the TaskManager.
    +    * @param resourceID       The id of the resource which the task 
manager will run on.
    +    */
    +   private TaskExecutor selectNetworkInterfaceAndRunTaskManager(
    --- End diff --
    
    Why does only the Yarn factory selects the network interface and not the 
standalone implementation?


> Implement TaskManagerFactory to bring up TaskManager for different modes
> ------------------------------------------------------------------------
>
>                 Key: FLINK-4505
>                 URL: https://issues.apache.org/jira/browse/FLINK-4505
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Cluster Management
>            Reporter: Zhijiang Wang
>            Assignee: Zhijiang Wang
>            Priority: Minor
>
> Implement {{TaskExecutorFactory}} that should be an abstract class with the 
> helper methods to bring up the {{TaskManager}}. The factory can be 
> implemented by some classes to start a {{TaskManager}} in different modes 
> (testing, standalone, yarn).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to