KarmaGYZ commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r488361230



##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -72,354 +62,237 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
 /**
- * The yarn implementation of the resource manager. Used when the system is 
started
- * via the resource framework YARN.
+ * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment.
  */
-public class YarnResourceManager extends 
LegacyActiveResourceManager<YarnWorkerNode>
-               implements AMRMClientAsync.CallbackHandler, 
NMClientAsync.CallbackHandler {
+public class YarnResourceManagerDriver extends 
AbstractResourceManagerDriver<YarnWorkerNode> {
 
        private static final Priority RM_REQUEST_PRIORITY = 
Priority.newInstance(1);
 
-       /** YARN container map. */
-       private final ConcurrentMap<ResourceID, YarnWorkerNode> workerNodeMap;
-
        /** Environment variable name of the hostname given by the YARN.
         * In task executor we use the hostnames given by YARN consistently 
throughout akka */
        static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
 
        static final String ERROR_MASSAGE_ON_SHUTDOWN_REQUEST = "Received 
shutdown request from YARN ResourceManager.";
 
-       /** Default heartbeat interval between this resource manager and the 
YARN ResourceManager. */
-       private final int yarnHeartbeatIntervalMillis;
-
        private final YarnConfiguration yarnConfig;
 
-       @Nullable
-       private final String webInterfaceUrl;
+       /** The process environment variables. */
+       private final YarnResourceManagerDriverConfiguration configuration;
 
-       /** The heartbeat interval while the resource master is waiting for 
containers. */
-       private final int containerRequestHeartbeatIntervalMillis;
+       /** Default heartbeat interval between this resource manager and the 
YARN ResourceManager. */
+       private final int yarnHeartbeatIntervalMillis;
 
        /** Client to communicate with the Resource Manager (YARN's master). */
        private AMRMClientAsync<AMRMClient.ContainerRequest> 
resourceManagerClient;
 
+       /** The heartbeat interval while the resource master is waiting for 
containers. */
+       private final int containerRequestHeartbeatIntervalMillis;
+
        /** Client to communicate with the Node manager and launch TaskExecutor 
processes. */
        private NMClientAsync nodeManagerClient;
 
-       private final WorkerSpecContainerResourceAdapter 
workerSpecContainerResourceAdapter;
+       /** Request resource futures, keyed by container ids. */
+       private final Map<TaskExecutorProcessSpec, 
Queue<CompletableFuture<YarnWorkerNode>>> requestResourceFutures;
+
+       private final TaskExecutorProcessSpecContainerResourceAdapter 
taskExecutorProcessSpecContainerResourceAdapter;
 
        private final RegisterApplicationMasterResponseReflector 
registerApplicationMasterResponseReflector;
 
-       private WorkerSpecContainerResourceAdapter.MatchingStrategy 
matchingStrategy;
-
-       public YarnResourceManager(
-                       RpcService rpcService,
-                       ResourceID resourceId,
-                       Configuration flinkConfig,
-                       Map<String, String> env,
-                       HighAvailabilityServices highAvailabilityServices,
-                       HeartbeatServices heartbeatServices,
-                       SlotManager slotManager,
-                       ResourceManagerPartitionTrackerFactory 
clusterPartitionTrackerFactory,
-                       JobLeaderIdService jobLeaderIdService,
-                       ClusterInformation clusterInformation,
-                       FatalErrorHandler fatalErrorHandler,
-                       @Nullable String webInterfaceUrl,
-                       ResourceManagerMetricGroup resourceManagerMetricGroup) {
-               super(
-                       flinkConfig,
-                       env,
-                       rpcService,
-                       resourceId,
-                       highAvailabilityServices,
-                       heartbeatServices,
-                       slotManager,
-                       clusterPartitionTrackerFactory,
-                       jobLeaderIdService,
-                       clusterInformation,
-                       fatalErrorHandler,
-                       resourceManagerMetricGroup);
+       private 
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy 
matchingStrategy;
+
+       private final YarnResourceManagerClientFactory 
yarnResourceManagerClientFactory;
+
+       private final YarnNodeManagerClientFactory yarnNodeManagerClientFactory;
+
+       public YarnResourceManagerDriver(
+               Configuration flinkConfig,
+               YarnResourceManagerDriverConfiguration configuration,
+               YarnResourceManagerClientFactory 
yarnResourceManagerClientFactory,
+               YarnNodeManagerClientFactory yarnNodeManagerClientFactory) {
+               super(flinkConfig, 
GlobalConfiguration.loadConfiguration(configuration.getCurrentDir()));
+
                this.yarnConfig = new YarnConfiguration();
-               this.workerNodeMap = new ConcurrentHashMap<>();
+               this.requestResourceFutures = new HashMap<>();
+               this.configuration = configuration;
+
                final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
-                               YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 
1000;
+                       YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
 
                final long yarnExpiryIntervalMS = yarnConfig.getLong(
-                               YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
-                               
YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
+                       YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+                       YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
 
                if (yarnHeartbeatIntervalMS >= yarnExpiryIntervalMS) {
                        log.warn("The heartbeat interval of the Flink 
Application master ({}) is greater " +
                                        "than YARN's expiry interval ({}). The 
application is likely to be killed by YARN.",
-                                       yarnHeartbeatIntervalMS, 
yarnExpiryIntervalMS);
+                               yarnHeartbeatIntervalMS, yarnExpiryIntervalMS);
                }
                yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMS;
                containerRequestHeartbeatIntervalMillis = 
flinkConfig.getInteger(YarnConfigOptions.CONTAINER_REQUEST_HEARTBEAT_INTERVAL_MILLISECONDS);
 
-               this.webInterfaceUrl = webInterfaceUrl;
-
-               this.workerSpecContainerResourceAdapter = 
Utils.createWorkerSpecContainerResourceAdapter(flinkConfig, yarnConfig);
+               this.taskExecutorProcessSpecContainerResourceAdapter = 
Utils.createTaskExecutorProcessSpecContainerResourceAdapter(flinkConfig, 
yarnConfig);
                this.registerApplicationMasterResponseReflector = new 
RegisterApplicationMasterResponseReflector(log);
 
                this.matchingStrategy = 
flinkConfig.getBoolean(YarnConfigOptionsInternal.MATCH_CONTAINER_VCORES) ?
-                       
WorkerSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE :
-                       
WorkerSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
-       }
-
-       protected AMRMClientAsync<AMRMClient.ContainerRequest> 
createAndStartResourceManagerClient(
-                       YarnConfiguration yarnConfiguration,
-                       int yarnHeartbeatIntervalMillis,
-                       @Nullable String webInterfaceUrl) throws Exception {
-               AMRMClientAsync<AMRMClient.ContainerRequest> 
resourceManagerClient = AMRMClientAsync.createAMRMClientAsync(
-                       yarnHeartbeatIntervalMillis,
-                       this);
-
-               resourceManagerClient.init(yarnConfiguration);
-               resourceManagerClient.start();
-
-               //TODO: change akka address to tcp host and port, the 
getAddress() interface should return a standard tcp address
-               Tuple2<String, Integer> hostPort = parseHostPort(getAddress());
-
-               final int restPort;
-
-               if (webInterfaceUrl != null) {
-                       final int lastColon = webInterfaceUrl.lastIndexOf(':');
-
-                       if (lastColon == -1) {
-                               restPort = -1;
-                       } else {
-                               restPort = 
Integer.valueOf(webInterfaceUrl.substring(lastColon + 1));
-                       }
-               } else {
-                       restPort = -1;
-               }
-
-               final RegisterApplicationMasterResponse 
registerApplicationMasterResponse =
-                       
resourceManagerClient.registerApplicationMaster(hostPort.f0, restPort, 
webInterfaceUrl);
-               
getContainersFromPreviousAttempts(registerApplicationMasterResponse);
-               updateMatchingStrategy(registerApplicationMasterResponse);
+                       
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE :
+                       
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
 
-               return resourceManagerClient;
+               this.yarnResourceManagerClientFactory = 
yarnResourceManagerClientFactory;
+               this.yarnNodeManagerClientFactory = 
yarnNodeManagerClientFactory;
        }
 
-       private void getContainersFromPreviousAttempts(final 
RegisterApplicationMasterResponse registerApplicationMasterResponse) {
-               final List<Container> containersFromPreviousAttempts =
-                       
registerApplicationMasterResponseReflector.getContainersFromPreviousAttempts(registerApplicationMasterResponse);
-
-               log.info("Recovered {} containers from previous attempts 
({}).", containersFromPreviousAttempts.size(), containersFromPreviousAttempts);
-
-               for (final Container container : 
containersFromPreviousAttempts) {
-                       final ResourceID resourceID = 
getContainerResourceId(container);
-                       workerNodeMap.put(resourceID, new 
YarnWorkerNode(container, resourceID));
-               }
-       }
-
-       private void updateMatchingStrategy(final 
RegisterApplicationMasterResponse registerApplicationMasterResponse) {
-               final Optional<Set<String>> schedulerResourceTypesOptional =
-                       
registerApplicationMasterResponseReflector.getSchedulerResourceTypeNames(registerApplicationMasterResponse);
-
-               final WorkerSpecContainerResourceAdapter.MatchingStrategy 
strategy;
-               if (schedulerResourceTypesOptional.isPresent()) {
-                       Set<String> types = 
schedulerResourceTypesOptional.get();
-                       log.info("Register application master response contains 
scheduler resource types: {}.", types);
-                       matchingStrategy = types.contains("CPU") ?
-                               
WorkerSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE :
-                               
WorkerSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
-               } else {
-                       log.info("Register application master response does not 
contain scheduler resource types, use '{}'.",
-                               
YarnConfigOptionsInternal.MATCH_CONTAINER_VCORES.key());
-               }
-               log.info("Container matching strategy: {}.", matchingStrategy);
-       }
-
-       protected NMClientAsync 
createAndStartNodeManagerClient(YarnConfiguration yarnConfiguration) {
-               // create the client to communicate with the node managers
-               NMClientAsync nodeManagerClient = 
NMClientAsync.createNMClientAsync(this);
-               nodeManagerClient.init(yarnConfiguration);
-               nodeManagerClient.start();
-               return nodeManagerClient;
-       }
-
-       @Override
-       protected Configuration loadClientConfiguration() {
-               return 
GlobalConfiguration.loadConfiguration(env.get(ApplicationConstants.Environment.PWD.key()));
-       }
+       // 
------------------------------------------------------------------------
+       //  ResourceManagerDriver
+       // 
------------------------------------------------------------------------
 
        @Override
-       protected void initialize() throws ResourceManagerException {
+       protected void initializeInternal() throws Exception {
+               final YarnContainerEventHandler yarnContainerEventHandler = new 
YarnContainerEventHandler();
                try {
-                       resourceManagerClient = 
createAndStartResourceManagerClient(
-                               yarnConfig,
+                       resourceManagerClient = 
yarnResourceManagerClientFactory.createResourceManagerClient(
                                yarnHeartbeatIntervalMillis,
-                               webInterfaceUrl);
+                               yarnContainerEventHandler);
+                       resourceManagerClient.init(yarnConfig);
+                       resourceManagerClient.start();
+
+                       final RegisterApplicationMasterResponse 
registerApplicationMasterResponse = registerApplicationMaster();
+                       
getContainersFromPreviousAttempts(registerApplicationMasterResponse);
+                       
updateMatchingStrategy(registerApplicationMasterResponse);
                } catch (Exception e) {
                        throw new ResourceManagerException("Could not start 
resource manager client.", e);
                }
 
-               nodeManagerClient = createAndStartNodeManagerClient(yarnConfig);
+               nodeManagerClient = 
yarnNodeManagerClientFactory.createNodeManagerClient(yarnContainerEventHandler);
+               nodeManagerClient.init(yarnConfig);
+               nodeManagerClient.start();
        }
 
        @Override
-       public void terminate() throws Exception {
+       public CompletableFuture<Void> terminate() {
                // shut down all components
-               Exception firstException = null;
+               Exception exception = null;
 
                if (resourceManagerClient != null) {
                        try {
                                resourceManagerClient.stop();
                        } catch (Exception e) {
-                               firstException = e;
+                               exception = e;
                        }
                }
 
                if (nodeManagerClient != null) {
                        try {
                                nodeManagerClient.stop();
                        } catch (Exception e) {
-                               firstException = 
ExceptionUtils.firstOrSuppressed(e, firstException);
+                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                        }
                }
 
-               ExceptionUtils.tryRethrowException(firstException);
+               return exception == null ?
+                       FutureUtils.completedVoidFuture() :
+                       FutureUtils.completedExceptionally(exception);
        }
 
        @Override
-       protected void internalDeregisterApplication(
-                       ApplicationStatus finalStatus,
-                       @Nullable String diagnostics) {
-
+       public void deregisterApplication(ApplicationStatus finalStatus, 
@Nullable String optionalDiagnostics) {
                // first, de-register from YARN
-               FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus);
+               final FinalApplicationStatus yarnStatus = 
getYarnStatus(finalStatus);
                log.info("Unregister application from the YARN Resource Manager 
with final status {}.", yarnStatus);
 
                final Optional<URL> historyServerURL = 
HistoryServerUtils.getHistoryServerURL(flinkConfig);
 
                final String appTrackingUrl = 
historyServerURL.map(URL::toString).orElse("");
 
                try {
-                       
resourceManagerClient.unregisterApplicationMaster(yarnStatus, diagnostics, 
appTrackingUrl);
+                       
resourceManagerClient.unregisterApplicationMaster(yarnStatus, 
optionalDiagnostics, appTrackingUrl);
                } catch (Throwable t) {
                        log.error("Could not unregister the application 
master.", t);
                }
 
-               Utils.deleteApplicationFiles(env);
+               Utils.deleteApplicationFiles(configuration.getYarnFiles());
        }
 
        @Override
-       public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
-               return requestYarnContainer(workerResourceSpec);
-       }
+       public CompletableFuture<YarnWorkerNode> 
requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
+               final Optional<Resource> containerResourceOptional = 
getContainerResource(taskExecutorProcessSpec);
+               final CompletableFuture<YarnWorkerNode> requestResourceFuture = 
new CompletableFuture<>();
 
-       @VisibleForTesting
-       Optional<Resource> getContainerResource(WorkerResourceSpec 
workerResourceSpec) {
-               return 
workerSpecContainerResourceAdapter.tryComputeContainerResource(workerResourceSpec);
+               if (containerResourceOptional.isPresent()) {
+                       
resourceManagerClient.addContainerRequest(getContainerRequest(containerResourceOptional.get()));
+
+                       // make sure we transmit the request fast and receive 
fast news of granted allocations
+                       
resourceManagerClient.setHeartbeatInterval(containerRequestHeartbeatIntervalMillis);
+
+                       
requestResourceFutures.computeIfAbsent(taskExecutorProcessSpec, ignore -> new 
LinkedList<>()).add(requestResourceFuture);
+
+                       log.info("Requesting new TaskExecutor container with 
resource {}.", taskExecutorProcessSpec);
+               } else {
+                       requestResourceFuture.completeExceptionally(
+                               new 
ResourceManagerException(String.format("Could not compute the container 
Resource from the given TaskExecutorProcessSpec %s.", 
taskExecutorProcessSpec)));
+               }
+
+               return requestResourceFuture;
        }
 
        @Override
-       public boolean stopWorker(final YarnWorkerNode workerNode) {
+       public void releaseResource(YarnWorkerNode workerNode) {
                final Container container = workerNode.getContainer();
                log.info("Stopping container {}.", 
workerNode.getResourceID().getStringWithMetadata());
                nodeManagerClient.stopContainerAsync(container.getId(), 
container.getNodeId());
                
resourceManagerClient.releaseAssignedContainer(container.getId());
-               workerNodeMap.remove(workerNode.getResourceID());
-               return true;
-       }
-
-       @Override
-       protected YarnWorkerNode workerStarted(ResourceID resourceID) {
-               return workerNodeMap.get(resourceID);
        }
 
        // 
------------------------------------------------------------------------
-       //  AMRMClientAsync CallbackHandler methods
+       //  Internal
        // 
------------------------------------------------------------------------
 
-       @Override
-       public float getProgress() {
-               // Temporarily need not record the total size of asked and 
allocated containers
-               return 1;
-       }
-
-       @Override
-       public void onContainersCompleted(final List<ContainerStatus> statuses) 
{
-               runAsync(() -> {
-                               log.debug("YARN ResourceManager reported the 
following containers completed: {}.", statuses);
-                               for (final ContainerStatus containerStatus : 
statuses) {
-
-                                       final ResourceID resourceId = new 
ResourceID(containerStatus.getContainerId().toString());
-                                       final YarnWorkerNode yarnWorkerNode = 
workerNodeMap.remove(resourceId);
-
-                                       
notifyAllocatedWorkerStopped(resourceId);
-
-                                       if (yarnWorkerNode != null) {
-                                               // Container completed 
unexpectedly ~> start a new one
-                                               
requestYarnContainerIfRequired();
-                                       }
-                                       // Eagerly close the connection with 
task manager.
-                                       closeTaskManagerConnection(resourceId, 
new Exception(containerStatus.getDiagnostics()));
-                               }
-                       }
-               );
-       }
-
-       @Override
-       public void onContainersAllocated(List<Container> containers) {
-               runAsync(() -> {
-                       log.info("Received {} containers.", containers.size());
-
-                       for (Map.Entry<Resource, List<Container>> entry : 
groupContainerByResource(containers).entrySet()) {
-                               onContainersOfResourceAllocated(entry.getKey(), 
entry.getValue());
-                       }
-
-                       // if we are waiting for no further containers, we can 
go to the
-                       // regular heartbeat interval
-                       if (getNumRequestedNotAllocatedWorkers() <= 0) {
-                               
resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
-                       }
-               });
-       }
-
-       private Map<Resource, List<Container>> 
groupContainerByResource(List<Container> containers) {
-               return 
containers.stream().collect(Collectors.groupingBy(Container::getResource));
-       }
-
        private void onContainersOfResourceAllocated(Resource resource, 
List<Container> containers) {
-               final List<WorkerResourceSpec> pendingWorkerResourceSpecs =
-                       
workerSpecContainerResourceAdapter.getWorkerSpecs(resource, 
matchingStrategy).stream()
+               final List<TaskExecutorProcessSpec> 
pendingTaskExecutorProcessSpecs =
+                       
taskExecutorProcessSpecContainerResourceAdapter.getTaskExecutorProcessSpec(resource,
 matchingStrategy).stream()
                                .flatMap(spec -> 
Collections.nCopies(getNumRequestedNotAllocatedWorkersFor(spec), spec).stream())
                                .collect(Collectors.toList());
 
-               int numPending = pendingWorkerResourceSpecs.size();
+               int numPending = pendingTaskExecutorProcessSpecs.size();
                log.info("Received {} containers with resource {}, {} pending 
container requests.",
                        containers.size(),
                        resource,
                        numPending);
 
                final Iterator<Container> containerIterator = 
containers.iterator();
-               final Iterator<WorkerResourceSpec> pendingWorkerSpecIterator = 
pendingWorkerResourceSpecs.iterator();
+               final Iterator<TaskExecutorProcessSpec> 
pendingTaskExecutorProcessSpecIterator = 
pendingTaskExecutorProcessSpecs.iterator();
                final Iterator<AMRMClient.ContainerRequest> 
pendingRequestsIterator =
-                       getPendingRequestsAndCheckConsistency(resource, 
pendingWorkerResourceSpecs.size()).iterator();
+                       getPendingRequestsAndCheckConsistency(resource, 
pendingTaskExecutorProcessSpecs.size()).iterator();
 
                int numAccepted = 0;
-               while (containerIterator.hasNext() && 
pendingWorkerSpecIterator.hasNext()) {
-                       final WorkerResourceSpec workerResourceSpec = 
pendingWorkerSpecIterator.next();
+               while (containerIterator.hasNext() && 
pendingTaskExecutorProcessSpecIterator.hasNext()) {
+                       final TaskExecutorProcessSpec taskExecutorProcessSpec = 
pendingTaskExecutorProcessSpecIterator.next();
                        final Container container = containerIterator.next();
                        final AMRMClient.ContainerRequest pendingRequest = 
pendingRequestsIterator.next();
                        final ResourceID resourceId = 
getContainerResourceId(container);
+                       final CompletableFuture<YarnWorkerNode> 
requestResourceFuture =
+                               Preconditions.checkNotNull(
+                                       Preconditions.checkNotNull(
+                                               
requestResourceFutures.get(taskExecutorProcessSpec),
+                                               "The requestResourceFuture for 
TasExecutorProcessSpec %s should not be null.", taskExecutorProcessSpec).poll(),
+                                       "The requestResourceFuture queue for 
TasExecutorProcessSpec %s should not be empty.", taskExecutorProcessSpec);
+                       if 
(requestResourceFutures.get(taskExecutorProcessSpec).isEmpty()) {

Review comment:
       No. We poll the first element from the 
`requestResourceFutures.get(taskExecutorProcessSpec)` before. Here we need to 
check if the queue becomes empty after that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to