[ https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15690846#comment-15690846 ]
ASF GitHub Bot commented on FLINK-4928: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89360108 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { --- End diff -- The current implementation tries to solve the Yarn cluster per job scenario, right? Maybe we could refactor the code such that we have `AbstractYarnFlinkApplicationMasterRunner` which does the generic Yarn initialization and two specializations `YarnPerJobFlinkApplicationMasterRunner` and `YarnClusterFlinkApplicationRunner`. The former basically works as this one. It starts a `ResourceManager` and a `JobManager` with the provided user code jar. The latter starts a `Dispatcher` (still to be implemented) and a `ResourceManager`. The `Dispatcher's` task is to receive job submission requests and to spawn for each request a new `JobMaster`. Then `JobMaster` is then responsible to run the submitted job. > Implement FLIP-6 YARN Application Master Runner > ----------------------------------------------- > > Key: FLINK-4928 > URL: https://issues.apache.org/jira/browse/FLINK-4928 > Project: Flink > Issue Type: Sub-task > Components: YARN > Environment: {{flip-6}} feature branch > Reporter: Stephan Ewen > Assignee: shuai.xu > > The Application Master Runner is the master process started in a YARN > container when submitting the Flink-on-YARN job to YARN. > It has the following data available: > - Flink jars > - Job jars > - JobGraph > - Environment variables > - Contextual information like security tokens and certificates > Its responsibility is the following: > - Read all configuration and environment variables, computing the effective > configuration > - Start all shared components (Rpc, HighAvailability Services) > - Start the ResourceManager > - Start the JobManager Runner -- This message was sent by Atlassian JIRA (v6.3.4#6332)