KarmaGYZ commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r502162885



##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -0,0 +1,609 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptionsInternal;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import 
org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.webmonitor.history.HistoryServerUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.flink.yarn.configuration.YarnResourceManagerConfiguration;
+
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of {@link ResourceManagerDriver} for Yarn deployment.
+ */
+public class YarnResourceManagerDriver extends 
AbstractResourceManagerDriver<YarnWorkerNode> {
+
+       private static final Priority RM_REQUEST_PRIORITY = 
Priority.newInstance(1);
+
+       /** Environment variable name of the hostname given by the YARN.
+        * In task executor we use the hostnames given by YARN consistently 
throughout akka */
+       private static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
+
+       static final String ERROR_MESSAGE_ON_SHUTDOWN_REQUEST = "Received 
shutdown request from YARN ResourceManager.";
+
+       private final YarnConfiguration yarnConfig;
+
+       /** The process environment variables. */
+       private final YarnResourceManagerConfiguration configuration;
+
+       /** Default heartbeat interval between this resource manager and the 
YARN ResourceManager. */
+       private final int yarnHeartbeatIntervalMillis;
+
+       /** Client to communicate with the Resource Manager (YARN's master). */
+       private AMRMClientAsync<AMRMClient.ContainerRequest> 
resourceManagerClient;
+
+       /** The heartbeat interval while the resource master is waiting for 
containers. */
+       private final int containerRequestHeartbeatIntervalMillis;
+
+       /** Client to communicate with the Node manager and launch TaskExecutor 
processes. */
+       private NMClientAsync nodeManagerClient;
+
+       /** Request resource futures, keyed by container ids. */
+       private final Map<TaskExecutorProcessSpec, 
Queue<CompletableFuture<YarnWorkerNode>>> requestResourceFutures;
+
+       private final TaskExecutorProcessSpecContainerResourceAdapter 
taskExecutorProcessSpecContainerResourceAdapter;
+
+       private final RegisterApplicationMasterResponseReflector 
registerApplicationMasterResponseReflector;
+
+       private 
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy 
matchingStrategy;
+
+       private final YarnResourceManagerClientFactory 
yarnResourceManagerClientFactory;
+
+       private final YarnNodeManagerClientFactory yarnNodeManagerClientFactory;
+
+       public YarnResourceManagerDriver(
+               Configuration flinkConfig,
+               YarnResourceManagerConfiguration configuration,
+               YarnResourceManagerClientFactory 
yarnResourceManagerClientFactory,
+               YarnNodeManagerClientFactory yarnNodeManagerClientFactory) {
+               super(flinkConfig, 
GlobalConfiguration.loadConfiguration(configuration.getCurrentDir()));
+
+               this.yarnConfig = new YarnConfiguration();
+               this.requestResourceFutures = new HashMap<>();
+               this.configuration = configuration;
+
+               final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
+                       YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
+
+               final long yarnExpiryIntervalMS = yarnConfig.getLong(
+                       YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+                       YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
+
+               if (yarnHeartbeatIntervalMS >= yarnExpiryIntervalMS) {
+                       log.warn("The heartbeat interval of the Flink 
Application master ({}) is greater " +
+                                       "than YARN's expiry interval ({}). The 
application is likely to be killed by YARN.",
+                               yarnHeartbeatIntervalMS, yarnExpiryIntervalMS);
+               }
+               yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMS;
+               containerRequestHeartbeatIntervalMillis = 
flinkConfig.getInteger(YarnConfigOptions.CONTAINER_REQUEST_HEARTBEAT_INTERVAL_MILLISECONDS);
+
+               this.taskExecutorProcessSpecContainerResourceAdapter = 
Utils.createTaskExecutorProcessSpecContainerResourceAdapter(flinkConfig, 
yarnConfig);
+               this.registerApplicationMasterResponseReflector = new 
RegisterApplicationMasterResponseReflector(log);
+
+               this.matchingStrategy = 
flinkConfig.getBoolean(YarnConfigOptionsInternal.MATCH_CONTAINER_VCORES) ?
+                       
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE :
+                       
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
+
+               this.yarnResourceManagerClientFactory = 
yarnResourceManagerClientFactory;
+               this.yarnNodeManagerClientFactory = 
yarnNodeManagerClientFactory;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  ResourceManagerDriver
+       // 
------------------------------------------------------------------------
+
+       @Override
+       protected void initializeInternal() throws Exception {
+               final YarnContainerEventHandler yarnContainerEventHandler = new 
YarnContainerEventHandler();
+               try {
+                       resourceManagerClient = 
yarnResourceManagerClientFactory.createResourceManagerClient(
+                               yarnHeartbeatIntervalMillis,
+                               yarnContainerEventHandler);
+                       resourceManagerClient.init(yarnConfig);
+                       resourceManagerClient.start();
+
+                       final RegisterApplicationMasterResponse 
registerApplicationMasterResponse = registerApplicationMaster();
+                       
getContainersFromPreviousAttempts(registerApplicationMasterResponse);
+                       
updateMatchingStrategy(registerApplicationMasterResponse);
+               } catch (Exception e) {
+                       throw new ResourceManagerException("Could not start 
resource manager client.", e);
+               }
+
+               nodeManagerClient = 
yarnNodeManagerClientFactory.createNodeManagerClient(yarnContainerEventHandler);
+               nodeManagerClient.init(yarnConfig);
+               nodeManagerClient.start();
+       }
+
+       @Override
+       public CompletableFuture<Void> terminate() {
+               // shut down all components
+               Exception exception = null;
+
+               if (resourceManagerClient != null) {
+                       try {
+                               resourceManagerClient.stop();
+                       } catch (Exception e) {
+                               exception = e;
+                       }
+               }
+
+               if (nodeManagerClient != null) {
+                       try {
+                               nodeManagerClient.stop();
+                       } catch (Exception e) {
+                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+                       }
+               }
+
+               return exception == null ?
+                       FutureUtils.completedVoidFuture() :
+                       FutureUtils.completedExceptionally(exception);
+       }
+
+       @Override
+       public void deregisterApplication(ApplicationStatus finalStatus, 
@Nullable String optionalDiagnostics) {
+               // first, de-register from YARN
+               final FinalApplicationStatus yarnStatus = 
getYarnStatus(finalStatus);
+               log.info("Unregister application from the YARN Resource Manager 
with final status {}.", yarnStatus);
+
+               final Optional<URL> historyServerURL = 
HistoryServerUtils.getHistoryServerURL(flinkConfig);
+
+               final String appTrackingUrl = 
historyServerURL.map(URL::toString).orElse("");
+
+               try {
+                       
resourceManagerClient.unregisterApplicationMaster(yarnStatus, 
optionalDiagnostics, appTrackingUrl);
+               } catch (Throwable t) {
+                       log.error("Could not unregister the application 
master.", t);
+               }
+
+               Utils.deleteApplicationFiles(configuration.getYarnFiles());
+       }
+
+       @Override
+       public CompletableFuture<YarnWorkerNode> 
requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
+               final Optional<Resource> containerResourceOptional = 
getContainerResource(taskExecutorProcessSpec);
+               final CompletableFuture<YarnWorkerNode> requestResourceFuture = 
new CompletableFuture<>();
+
+               if (containerResourceOptional.isPresent()) {
+                       
resourceManagerClient.addContainerRequest(getContainerRequest(containerResourceOptional.get()));
+
+                       // make sure we transmit the request fast and receive 
fast news of granted allocations
+                       
resourceManagerClient.setHeartbeatInterval(containerRequestHeartbeatIntervalMillis);
+
+                       
requestResourceFutures.computeIfAbsent(taskExecutorProcessSpec, ignore -> new 
LinkedList<>()).add(requestResourceFuture);
+
+                       log.info("Requesting new TaskExecutor container with 
resource {}.", taskExecutorProcessSpec);
+               } else {
+                       requestResourceFuture.completeExceptionally(
+                               new 
ResourceManagerException(String.format("Could not compute the container 
Resource from the given TaskExecutorProcessSpec %s.", 
taskExecutorProcessSpec)));
+               }
+
+               return requestResourceFuture;
+       }
+
+       @Override
+       public void releaseResource(YarnWorkerNode workerNode) {
+               final Container container = workerNode.getContainer();
+               log.info("Stopping container {}.", 
workerNode.getResourceID().getStringWithMetadata());
+               nodeManagerClient.stopContainerAsync(container.getId(), 
container.getNodeId());
+               
resourceManagerClient.releaseAssignedContainer(container.getId());
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Internal
+       // 
------------------------------------------------------------------------
+
+       private void onContainersOfResourceAllocated(Resource resource, 
List<Container> containers) {
+               final List<TaskExecutorProcessSpec> 
pendingTaskExecutorProcessSpecs =
+                       
taskExecutorProcessSpecContainerResourceAdapter.getTaskExecutorProcessSpec(resource,
 matchingStrategy).stream()
+                               .flatMap(spec -> 
Collections.nCopies(getNumRequestedNotAllocatedWorkersFor(spec), spec).stream())
+                               .collect(Collectors.toList());
+
+               int numPending = pendingTaskExecutorProcessSpecs.size();
+               log.info("Received {} containers with resource {}, {} pending 
container requests.",
+                       containers.size(),
+                       resource,
+                       numPending);
+
+               final Iterator<Container> containerIterator = 
containers.iterator();
+               final Iterator<TaskExecutorProcessSpec> 
pendingTaskExecutorProcessSpecIterator = 
pendingTaskExecutorProcessSpecs.iterator();
+               final Iterator<AMRMClient.ContainerRequest> 
pendingRequestsIterator =
+                       getPendingRequestsAndCheckConsistency(resource, 
pendingTaskExecutorProcessSpecs.size()).iterator();
+
+               int numAccepted = 0;
+               while (containerIterator.hasNext() && 
pendingTaskExecutorProcessSpecIterator.hasNext()) {
+                       final TaskExecutorProcessSpec taskExecutorProcessSpec = 
pendingTaskExecutorProcessSpecIterator.next();
+                       final Container container = containerIterator.next();
+                       final AMRMClient.ContainerRequest pendingRequest = 
pendingRequestsIterator.next();
+                       final ResourceID resourceId = 
getContainerResourceId(container);
+                       final CompletableFuture<YarnWorkerNode> 
requestResourceFuture =
+                               Preconditions.checkNotNull(
+                                       Preconditions.checkNotNull(
+                                               
requestResourceFutures.get(taskExecutorProcessSpec),
+                                               "The requestResourceFuture for 
TasExecutorProcessSpec %s should not be null.", taskExecutorProcessSpec).poll(),
+                                       "The requestResourceFuture queue for 
TasExecutorProcessSpec %s should not be empty.", taskExecutorProcessSpec);
+                       if 
(requestResourceFutures.get(taskExecutorProcessSpec).isEmpty()) {
+                               
requestResourceFutures.remove(taskExecutorProcessSpec);
+                       }
+
+                       startTaskExecutorInContainer(container, 
taskExecutorProcessSpec, resourceId, requestResourceFuture);
+                       removeContainerRequest(pendingRequest);
+
+                       numAccepted++;
+               }
+               numPending -= numAccepted;
+
+               int numExcess = 0;
+               while (containerIterator.hasNext()) {
+                       returnExcessContainer(containerIterator.next());
+                       numExcess++;
+               }
+
+               log.info("Accepted {} requested containers, returned {} excess 
containers, {} pending container requests of resource {}.",
+                       numAccepted, numExcess, numPending, resource);
+       }
+
+       private int getNumRequestedNotAllocatedWorkers() {
+               return 
requestResourceFutures.values().stream().mapToInt(Queue::size).sum();
+       }
+
+       private int 
getNumRequestedNotAllocatedWorkersFor(TaskExecutorProcessSpec 
taskExecutorProcessSpec) {
+               return 
requestResourceFutures.getOrDefault(taskExecutorProcessSpec, new 
LinkedList<>()).size();
+       }
+
+       private void removeContainerRequest(AMRMClient.ContainerRequest 
pendingContainerRequest) {
+               log.info("Removing container request {}.", 
pendingContainerRequest);
+               
resourceManagerClient.removeContainerRequest(pendingContainerRequest);
+       }
+
+       private void returnExcessContainer(Container excessContainer) {
+               log.info("Returning excess container {}.", 
excessContainer.getId());
+               
resourceManagerClient.releaseAssignedContainer(excessContainer.getId());
+       }
+
+       private void startTaskExecutorInContainer(Container container, 
TaskExecutorProcessSpec taskExecutorProcessSpec, ResourceID resourceId, 
CompletableFuture<YarnWorkerNode> requestResourceFuture) {
+               final YarnWorkerNode yarnWorkerNode = new 
YarnWorkerNode(container, resourceId);
+
+               try {
+                       // Context information used to start a TaskExecutor 
Java process
+                       ContainerLaunchContext taskExecutorLaunchContext = 
createTaskExecutorLaunchContext(
+                               resourceId,
+                               container.getNodeId().getHost(),
+                               taskExecutorProcessSpec);
+
+                       nodeManagerClient.startContainerAsync(container, 
taskExecutorLaunchContext);
+                       requestResourceFuture.complete(yarnWorkerNode);
+               } catch (Throwable t) {
+                       requestResourceFuture.completeExceptionally(t);
+               }
+       }
+
+       private Collection<AMRMClient.ContainerRequest> 
getPendingRequestsAndCheckConsistency(Resource resource, int expectedNum) {
+               final Collection<Resource> equivalentResources = 
taskExecutorProcessSpecContainerResourceAdapter.getEquivalentContainerResource(resource,
 matchingStrategy);
+               final List<? extends Collection<AMRMClient.ContainerRequest>> 
matchingRequests =
+                       equivalentResources.stream()
+                               .flatMap(equivalentResource -> 
resourceManagerClient.getMatchingRequests(
+                                       RM_REQUEST_PRIORITY,
+                                       ResourceRequest.ANY,
+                                       equivalentResource).stream())
+                               .collect(Collectors.toList());
+
+               final Collection<AMRMClient.ContainerRequest> 
matchingContainerRequests;
+
+               if (matchingRequests.isEmpty()) {
+                       matchingContainerRequests = Collections.emptyList();
+               } else {
+                       final Collection<AMRMClient.ContainerRequest> 
collection = matchingRequests.get(0);
+                       matchingContainerRequests = new ArrayList<>(collection);
+               }

Review comment:
       I think it would be a valid issue. In the 
[description](https://github.com/apache/hadoop/blob/deb35a32bafdd3065e3c2f243d84ef79209838e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java#L684)
 of this interface, it seems it will return all the requests which could fit in 
the given resource. However, in its 
[implementation](https://github.com/apache/hadoop/blob/deb35a32bafdd3065e3c2f243d84ef79209838e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java#L734),
 it will first try to return the exactly matching requests. If not found, it 
will regress to the aforementioned logic.
   
   Recently in 
[FLINK-19324](https://issues.apache.org/jira/browse/FLINK-19324), we found that 
we perhaps need to use priority to map the different requests. If so, this 
issue should be also fixed from my understanding. Anyway, I tend to put this 
issue out of the scope of this PR. WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to