
ASF GitHub Bot commented on FLINK-4928:

Github user shuai-xu commented on a diff in the pull request:

    --- Diff: 
    @@ -0,0 +1,612 @@
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.yarn;
    +import akka.actor.ActorSystem;
    +import org.apache.flink.api.common.JobExecutionResult;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.GlobalConfiguration;
    +import org.apache.flink.configuration.HighAvailabilityOptions;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.client.JobExecutionException;
    +import org.apache.flink.runtime.clusterframework.ApplicationStatus;
    +import org.apache.flink.runtime.clusterframework.BootstrapTools;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobmanager.OnCompletionActions;
    +import org.apache.flink.runtime.jobmaster.JobManagerServices;
    +import org.apache.flink.runtime.jobmaster.JobMaster;
    +import org.apache.flink.runtime.leaderelection.LeaderContender;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
    +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
    +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
    +import org.apache.flink.runtime.resourcemanager.ResourceManager;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
    +import org.apache.flink.runtime.security.SecurityContext;
    +import org.apache.flink.runtime.util.EnvironmentInformation;
    +import org.apache.flink.runtime.util.JvmShutdownSafeguard;
    +import org.apache.flink.runtime.util.SignalHandler;
    +import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
    +import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.concurrent.duration.FiniteDuration;
    +import javax.annotation.concurrent.GuardedBy;
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.util.Map;
    +import java.util.UUID;
    + * This class is the executable entry point for the YARN application 
    + * It starts actor system and the actors for {@link 
    + * and {@link org.apache.flink.yarn.YarnResourceManager}.
    + *
    + * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
    + * allocation and failure detection.
    + */
    +public class YarnFlinkApplicationMasterRunner implements LeaderContender, 
OnCompletionActions, FatalErrorHandler {
    +   /** Logger */
    +   protected static final Logger LOG = 
    +   /** The process environment variables */
    +   private static final Map<String, String> ENV = System.getenv();
    +   /** The exit code returned if the initialization of the application 
master failed */
    +   private static final int INIT_ERROR_EXIT_CODE = 31;
    +   /** The job graph file path */
    +   private static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
    +   /** The lock to guard startup / shutdown / manipulation methods */
    +   private final Object lock = new Object();
    +   @GuardedBy("lock")
    +   private MetricRegistry metricRegistry;
    +   @GuardedBy("lock")
    +   private HighAvailabilityServices haServices;
    +   @GuardedBy("lock")
    +   private LeaderElectionService jmLeaderElectionService;
    +   @GuardedBy("lock")
    +   private RpcService jobMasterRpcService;
    +   @GuardedBy("lock")
    +   private RpcService resourceManagerRpcService;
    +   @GuardedBy("lock")
    +   private ResourceManager resourceManager;
    +   @GuardedBy("lock")
    +   private JobMaster jobMaster;
    +   @GuardedBy("lock")
    +   JobManagerServices jobManagerServices;
    +   @GuardedBy("lock")
    +   JobManagerMetricGroup jobManagerMetrics;
    +   @GuardedBy("lock")
    +   private JobGraph jobGraph;
    +   /** Flag marking the app master runner as started/running */
    +   private volatile boolean running;
    +   // 
    +   //  Program entry point
    +   // 
    +   /**
    +    * The entry point for the YARN application master.
    +    *
    +    * @param args The command line arguments.
    +    */
    +   public static void main(String[] args) {
    +           EnvironmentInformation.logEnvironmentInfo(LOG, "YARN 
ApplicationMaster runner", args);
    +           SignalHandler.register(LOG);
    +           JvmShutdownSafeguard.installAsShutdownHook(LOG);
    +           // run and exit with the proper return code
    +           int returnCode = new 
    +           System.exit(returnCode);
    +   }
    +   /**
    +    * The instance entry point for the YARN application master. Obtains 
user group
    +    * information and calls the main work method {@link 
#runApplicationMaster(org.apache.flink.configuration.Configuration)} as a
    +    * privileged action.
    +    *
    +    * @param args The command line arguments.
    +    * @return The process exit code.
    +    */
    +   protected int run(String[] args) {
    +           try {
    +                   LOG.debug("All environment variables: {}", ENV);
    +                   final String yarnClientUsername = 
    +                   require(yarnClientUsername != null, "YARN client user 
name environment variable {} not set",
    +                           YarnConfigKeys.ENV_HADOOP_USER_NAME);
    +                   final String currDir = ENV.get(Environment.PWD.key());
    +                   require(currDir != null, "Current working directory 
variable (%s) not set", Environment.PWD.key());
    +                   LOG.debug("Current working Directory: {}", currDir);
    +                   final String remoteKeytabPath = 
    +                   LOG.debug("remoteKeytabPath obtained {}", 
    +                   final String remoteKeytabPrincipal = 
    +                   LOG.info("remoteKeytabPrincipal obtained {}", 
    +                   String keytabPath = null;
    +                   if(remoteKeytabPath != null) {
    +                           File f = new File(currDir, 
    +                           keytabPath = f.getAbsolutePath();
    +                           LOG.debug("keytabPath: {}", keytabPath);
    +                   }
    +                   UserGroupInformation currentUser = 
    +                   LOG.info("YARN daemon is running as: {} Yarn client 
user obtainer: {}",
    +                                   currentUser.getShortUserName(), 
yarnClientUsername );
    +                   SecurityContext.SecurityConfiguration sc = new 
    +                   //To support Yarn Secure Integration Test Scenario
    +                   File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
    +                   if(krb5Conf.exists() && krb5Conf.canRead()) {
    +                           String krb5Path = krb5Conf.getAbsolutePath();
    +                           LOG.info("KRB5 Conf: {}", krb5Path);
    +                           org.apache.hadoop.conf.Configuration conf = new 
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
    +                           sc.setHadoopConfiguration(conf);
    +                   }
    +                   // Flink configuration
    +                   final Map<String, String> dynamicProperties =
    +                   LOG.debug("YARN dynamic properties: {}", 
    +                   final Configuration flinkConfig = 
createConfiguration(currDir, dynamicProperties);
    +                   if(keytabPath != null && remoteKeytabPrincipal != null) 
flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
    +                   }
    +                   return SecurityContext.getInstalled().runSecured(new 
SecurityContext.FlinkSecuredRunner<Integer>() {
    +                           @Override
    +                           public Integer run() {
    +                                   return 
    +                           }
    +                   });
    +           }
    +           catch (Throwable t) {
    +                   // make sure that everything whatever ends up in the log
    +                   LOG.error("YARN Application Master initialization 
failed", t);
    +                   return INIT_ERROR_EXIT_CODE;
    +           }
    +   }
    +   // 
    +   //  Core work method
    +   // 
    +   /**
    +    * The main work method, must run as a privileged action.
    +    *
    +    * @return The return code for the Java process.
    +    */
    +   protected int runApplicationMaster(Configuration config) {
    +           try {
    +                   // ---- (1) create common services
    +                   // Note that we use the "appMasterHostname" given by 
YARN here, to make sure
    +                   // we use the hostnames given by YARN consistently 
throughout akka.
    +                   // for akka "localhost" and "localhost.localdomain" are 
different actors.
    +                   final String appMasterHostname = 
    +                   require(appMasterHostname != null,
    +                                   "ApplicationMaster hostname variable %s 
not set", Environment.NM_HOST.key());
    +                   LOG.info("YARN assigned hostname for application 
master: {}", appMasterHostname);
    +                   // try to start the rpc service
    +                   // using the port range definition from the config.
    +                   final String amPortRange = config.getString(
    +                   synchronized (lock) {
    --- End diff --
    this is before starting, other thread has not been started,  so it seems no 
thead will call shutdown before it.

> Implement FLIP-6 YARN Application Master Runner
> -----------------------------------------------
>                 Key: FLINK-4928
>                 URL: https://issues.apache.org/jira/browse/FLINK-4928
>             Project: Flink
>          Issue Type: Sub-task
>          Components: YARN
>         Environment: {{flip-6}} feature branch
>            Reporter: Stephan Ewen
>            Assignee: shuai.xu
> The Application Master Runner is the master process started in a YARN 
> container when submitting the Flink-on-YARN job to YARN.
> It has the following data available:
>   - Flink jars
>   - Job jars
>   - JobGraph
>   - Environment variables
>   - Contextual information like security tokens and certificates
> Its responsibility is the following:
>   - Read all configuration and environment variables, computing the effective 
> configuration
>   - Start all shared components (Rpc, HighAvailability Services)
>   - Start the ResourceManager
>   - Start the JobManager Runner

This message was sent by Atlassian JIRA

Reply via email to