tillrohrmann closed pull request #7432: [BP-1.6] [FLINK-10848] Remove container 
requests after successful container allocation
URL: https://github.com/apache/flink/pull/7432
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index bcc298073ca..1a0520f68fe 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -167,8 +167,6 @@
                
YARN_CONFIGURATION.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
                
YARN_CONFIGURATION.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
                YARN_CONFIGURATION.setInt(YarnConfiguration.NM_VCORES, 666); // 
memory is overwritten in the MiniYARNCluster.
-               
YARN_CONFIGURATION.set("yarn.scheduler.capacity.resource-calculator",
-                               
"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
                // so we have to change the number of cores for testing.
                
YARN_CONFIGURATION.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000); 
// 20 seconds expiry (to ensure we properly heartbeat with YARN).
        }
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
index 3327505e32d..e8e55c33303 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
@@ -41,15 +41,20 @@
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.slf4j.Logger;
 
+import javax.annotation.Nonnull;
+
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -72,6 +77,8 @@
         * Container ID generation may vary across Hadoop versions. */
        static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
 
+       private static final Priority RM_REQUEST_PRIORITY = 
Priority.newInstance(0);
+
        /** The containers where a TaskManager is starting and we are waiting 
for it to register. */
        private final Map<ResourceID, YarnContainerInLaunch> containersInLaunch;
 
@@ -314,6 +321,21 @@ protected void fatalError(String message, Throwable error) 
{
 
        @Override
        protected void requestNewWorkers(int numWorkers) {
+               final Resource capability = getContainerResource();
+
+               for (int i = 0; i < numWorkers; i++) {
+                       numPendingContainerRequests++;
+                       LOG.info("Requesting new TaskManager container with {} 
megabytes memory. Pending requests: {}",
+                               capability.getMemory(), 
numPendingContainerRequests);
+
+                       
resourceManagerClient.addContainerRequest(createContainerRequest(capability));
+               }
+
+               // make sure we transmit the request fast and receive fast news 
of granted allocations
+               
resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
+       }
+
+       private Resource getContainerResource() {
                final long mem = 
taskManagerParameters.taskManagerTotalMemoryMB();
                final int containerMemorySizeMB;
 
@@ -325,25 +347,15 @@ protected void requestNewWorkers(int numWorkers) {
                                mem, containerMemorySizeMB);
                }
 
-               for (int i = 0; i < numWorkers; i++) {
-                       numPendingContainerRequests++;
-                       LOG.info("Requesting new TaskManager container with {} 
megabytes memory. Pending requests: {}",
-                               containerMemorySizeMB, 
numPendingContainerRequests);
-
-                       // Priority for worker containers - priorities are 
intra-application
-                       Priority priority = Priority.newInstance(0);
-
-                       // Resource requirements for worker containers
-                       int taskManagerSlots = taskManagerParameters.numSlots();
-                       int vcores = 
config.getInteger(YarnConfigOptions.VCORES, Math.max(taskManagerSlots, 1));
-                       Resource capability = 
Resource.newInstance(containerMemorySizeMB, vcores);
-
-                       resourceManagerClient.addContainerRequest(
-                               new AMRMClient.ContainerRequest(capability, 
null, null, priority));
-               }
+               // Resource requirements for worker containers
+               int taskManagerSlots = taskManagerParameters.numSlots();
+               int vcores = config.getInteger(YarnConfigOptions.VCORES, 
Math.max(taskManagerSlots, 1));
+               return Resource.newInstance(containerMemorySizeMB, vcores);
+       }
 
-               // make sure we transmit the request fast and receive fast news 
of granted allocations
-               
resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
+       @Nonnull
+       private AMRMClient.ContainerRequest createContainerRequest(Resource 
capability) {
+               return new AMRMClient.ContainerRequest(capability, null, null, 
RM_REQUEST_PRIORITY);
        }
 
        @Override
@@ -434,12 +446,17 @@ private void containersAllocated(List<Container> 
containers) {
                final int numRequired = getDesignatedWorkerPoolSize();
                final int numRegistered = getNumberOfStartedTaskManagers();
 
+               final Collection<AMRMClient.ContainerRequest> pendingRequests = 
getPendingRequests();
+               final Iterator<AMRMClient.ContainerRequest> 
pendingRequestsIterator = pendingRequests.iterator();
+
                for (Container container : containers) {
+                       if (numPendingContainerRequests > 0) {
+                               numPendingContainerRequests -= 1;
+                               
resourceManagerClient.removeContainerRequest(pendingRequestsIterator.next());
+                       }
                        numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
                        LOG.info("Received new container: {} - Remaining 
pending container requests: {}",
                                container.getId(), numPendingContainerRequests);
-                       resourceManagerClient.removeContainerRequest(new 
AMRMClient.ContainerRequest(
-                                       container.getResource(), null, null, 
container.getPriority()));
 
                        // decide whether to return the container, or whether 
to start a TaskManager
                        if (numRegistered + containersInLaunch.size() < 
numRequired) {
@@ -489,6 +506,24 @@ private void containersAllocated(List<Container> 
containers) {
                triggerCheckWorkers();
        }
 
+       private Collection<AMRMClient.ContainerRequest> getPendingRequests() {
+               final List<? extends Collection<AMRMClient.ContainerRequest>> 
matchingRequests = 
resourceManagerClient.getMatchingRequests(RM_REQUEST_PRIORITY, 
ResourceRequest.ANY, getContainerResource());
+
+               final Collection<AMRMClient.ContainerRequest> result;
+
+               if (matchingRequests.isEmpty()) {
+                       result = Collections.emptyList();
+               } else {
+                       result = new ArrayList<>(matchingRequests.get(0));
+               }
+
+               Preconditions.checkState(
+                       result.size() == numPendingContainerRequests,
+                       "The RMClient's and YarnResourceManagers internal state 
about the number of pending container requests has diverged. Number client's 
pending container requests %s != Number RM's pending container requests %s.", 
result.size(), numPendingContainerRequests);
+
+               return result;
+       }
+
        /**
         * Invoked when the ResourceManager informs of completed containers.
         * Called via an actor message by the callback from the ResourceManager 
client.
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 9d9d21bc5b8..fc2aaa332b5 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
@@ -42,6 +43,7 @@
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -53,14 +55,20 @@
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -73,6 +81,7 @@
  */
 public class YarnResourceManager extends ResourceManager<YarnWorkerNode> 
implements AMRMClientAsync.CallbackHandler {
 
+       private static final Priority RM_REQUEST_PRIORITY = 
Priority.newInstance(1);
        /** The process environment variables. */
        private final Map<String, String> env;
 
@@ -117,6 +126,8 @@
 
        private final Map<ResourceProfile, Integer> resourcePriorities = new 
HashMap<>();
 
+       private final Resource resource;
+
        public YarnResourceManager(
                        RpcService rpcService,
                        String resourceManagerEndpointId,
@@ -169,6 +180,8 @@ public YarnResourceManager(
                this.numberOfTaskSlots = 
flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
                this.defaultTaskManagerMemoryMB = 
ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes();
                this.defaultCpus = 
flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots);
+
+               this.resource = 
Resource.newInstance(defaultTaskManagerMemoryMB, defaultCpus);
        }
 
        protected AMRMClientAsync<AMRMClient.ContainerRequest> 
createAndStartResourceManagerClient(
@@ -290,13 +303,15 @@ protected void internalDeregisterApplication(
 
        @Override
        public void startNewWorker(ResourceProfile resourceProfile) {
-               // Priority for worker containers - priorities are 
intra-application
-               //TODO: set priority according to the resource allocated
-               Priority priority = 
Priority.newInstance(generatePriority(resourceProfile));
-               int mem = resourceProfile.getMemoryInMB() < 0 ? 
defaultTaskManagerMemoryMB : resourceProfile.getMemoryInMB();
-               int vcore = resourceProfile.getCpuCores() < 1 ? defaultCpus : 
(int) resourceProfile.getCpuCores();
-               Resource capability = Resource.newInstance(mem, vcore);
-               requestYarnContainer(capability, priority);
+               Preconditions.checkArgument(
+                       ResourceProfile.UNKNOWN.equals(resourceProfile),
+                       "The YarnResourceManager does not support custom 
ResourceProfiles yet. It assumes that all containers have the same resources.");
+               requestYarnContainer();
+       }
+
+       @VisibleForTesting
+       Resource getContainerResource() {
+               return resource;
        }
 
        @Override
@@ -339,8 +354,7 @@ public void onContainersCompleted(final 
List<ContainerStatus> statuses) {
 
                                        if (yarnWorkerNode != null) {
                                                // Container completed 
unexpectedly ~> start a new one
-                                               final Container container = 
yarnWorkerNode.getContainer();
-                                               
requestYarnContainer(container.getResource(), 
yarnWorkerNode.getContainer().getPriority());
+                                               requestYarnContainer();
                                        }
                                        // Eagerly close the connection with 
task manager.
                                        closeTaskManagerConnection(resourceId, 
new Exception(containerStatus.getDiagnostics()));
@@ -352,15 +366,17 @@ public void onContainersCompleted(final 
List<ContainerStatus> statuses) {
        @Override
        public void onContainersAllocated(List<Container> containers) {
                runAsync(() -> {
+                       final Collection<AMRMClient.ContainerRequest> 
pendingRequests = getPendingRequests();
+                       final Iterator<AMRMClient.ContainerRequest> 
pendingRequestsIterator = pendingRequests.iterator();
+
                        for (Container container : containers) {
                                log.info(
                                        "Received new container: {} - Remaining 
pending container requests: {}",
                                        container.getId(),
                                        numPendingContainerRequests);
-                               
resourceManagerClient.removeContainerRequest(new AMRMClient.ContainerRequest(
-                                               container.getResource(), null, 
null, container.getPriority()));
+
                                if (numPendingContainerRequests > 0) {
-                                       numPendingContainerRequests--;
+                                       
removeContainerRequest(pendingRequestsIterator.next());
 
                                        final String containerIdStr = 
container.getId().toString();
                                        final ResourceID resourceId = new 
ResourceID(containerIdStr);
@@ -382,7 +398,7 @@ public void onContainersAllocated(List<Container> 
containers) {
                                                
workerNodeMap.remove(resourceId);
                                                
resourceManagerClient.releaseAssignedContainer(container.getId());
                                                // and ask for a new one
-                                               
requestYarnContainer(container.getResource(), container.getPriority());
+                                               requestYarnContainer();
                                        }
                                } else {
                                        // return the excessive containers
@@ -399,6 +415,36 @@ public void onContainersAllocated(List<Container> 
containers) {
                });
        }
 
+       private void removeContainerRequest(AMRMClient.ContainerRequest 
pendingContainerRequest) {
+               numPendingContainerRequests--;
+
+               log.info("Removing container request {}. Pending container 
requests {}.", pendingContainerRequest, pendingContainerRequest);
+
+               
resourceManagerClient.removeContainerRequest(pendingContainerRequest);
+       }
+
+       private Collection<AMRMClient.ContainerRequest> getPendingRequests() {
+               final List<? extends Collection<AMRMClient.ContainerRequest>> 
matchingRequests = resourceManagerClient.getMatchingRequests(
+                       RM_REQUEST_PRIORITY,
+                       ResourceRequest.ANY,
+                       getContainerResource());
+
+               final Collection<AMRMClient.ContainerRequest> 
matchingContainerRequests;
+
+               if (matchingRequests.isEmpty()) {
+                       matchingContainerRequests = Collections.emptyList();
+               } else {
+                       final Collection<AMRMClient.ContainerRequest> 
collection = matchingRequests.get(0);
+                       matchingContainerRequests = new ArrayList<>(collection);
+               }
+
+               Preconditions.checkState(
+                       matchingContainerRequests.size() == 
numPendingContainerRequests,
+                       "The RMClient's and YarnResourceManagers internal state 
about the number of pending container requests has diverged. Number client's 
pending container requests %s != Number RM's pending container requests %s.", 
matchingContainerRequests.size(), numPendingContainerRequests);
+
+               return matchingContainerRequests;
+       }
+
        @Override
        public void onShutdownRequest() {
                shutDown();
@@ -453,11 +499,11 @@ private FinalApplicationStatus 
getYarnStatus(ApplicationStatus status) {
        /**
         * Request new container if pending containers cannot satisfies pending 
slot requests.
         */
-       private void requestYarnContainer(Resource resource, Priority priority) 
{
+       private void requestYarnContainer() {
                int pendingSlotRequests = getNumberPendingSlotRequests();
                int pendingSlotAllocation = numPendingContainerRequests * 
numberOfTaskSlots;
                if (pendingSlotRequests > pendingSlotAllocation) {
-                       resourceManagerClient.addContainerRequest(new 
AMRMClient.ContainerRequest(resource, null, null, priority));
+                       
resourceManagerClient.addContainerRequest(getContainerRequest());
 
                        // make sure we transmit the request fast and receive 
fast news of granted allocations
                        
resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
@@ -470,6 +516,16 @@ private void requestYarnContainer(Resource resource, 
Priority priority) {
                }
        }
 
+       @Nonnull
+       @VisibleForTesting
+       AMRMClient.ContainerRequest getContainerRequest() {
+               return new AMRMClient.ContainerRequest(
+                       getContainerResource(),
+                       null,
+                       null,
+                       RM_REQUEST_PRIORITY);
+       }
+
        private ContainerLaunchContext createTaskExecutorLaunchContext(Resource 
resource, String containerId, String host)
                        throws Exception {
                // init the ContainerLaunchContext
@@ -506,22 +562,4 @@ private ContainerLaunchContext 
createTaskExecutorLaunchContext(Resource resource
                                .put(ENV_FLINK_NODE_ID, host);
                return taskExecutorLaunchContext;
        }
-
-
-
-       /**
-        * Generate priority by given resource profile.
-        * Priority is only used for distinguishing request of different 
resource.
-        * @param resourceProfile The resource profile of a request
-        * @return The priority of this resource profile.
-        */
-       private int generatePriority(ResourceProfile resourceProfile) {
-               if (resourcePriorities.containsKey(resourceProfile)) {
-                       return resourcePriorities.get(resourceProfile);
-               } else {
-                       int priority = resourcePriorities.size();
-                       resourcePriorities.put(resourceProfile, priority);
-                       return priority;
-               }
-       }
 }
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
index d665df6bc7c..3489726ddd1 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
@@ -52,9 +52,7 @@
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -71,11 +69,11 @@
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.any;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
@@ -130,25 +128,9 @@ public void 
testYarnFlinkResourceManagerJobManagerLostLeadership() throws Except
                                                        1),
                                                i));
                                
when(mockContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 
1234));
-                               
when(mockContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
-                               
when(mockContainer.getPriority()).thenReturn(Priority.UNDEFINED);
                                containerList.add(mockContainer);
                        }
 
-                       doAnswer(new Answer() {
-                               int counter = 0;
-                               @Override
-                               public Object answer(InvocationOnMock 
invocation) throws Throwable {
-                                       if (counter < containerList.size()) {
-                                               
callbackHandler.onContainersAllocated(
-                                                       
Collections.singletonList(
-                                                               
containerList.get(counter++)
-                                                       ));
-                                       }
-                                       return null;
-                               }
-                       
}).when(resourceManagerClient).addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class));
-
                        final CompletableFuture<AkkaActorGateway> 
resourceManagerFuture = new CompletableFuture<>();
                        final CompletableFuture<AkkaActorGateway> 
leaderGatewayFuture = new CompletableFuture<>();
 
@@ -166,8 +148,8 @@ public Object answer(InvocationOnMock invocation) throws 
Throwable {
                                })
                                .when(nodeManagerClient)
                                .startContainer(
-                                       Matchers.any(Container.class),
-                                       
Matchers.any(ContainerLaunchContext.class));
+                                       any(Container.class),
+                                       any(ContainerLaunchContext.class));
 
                        ActorRef resourceManager = null;
                        ActorRef leader1;
@@ -198,6 +180,9 @@ public Object answer(InvocationOnMock invocation) throws 
Throwable {
                                                nodeManagerClient
                                        ));
 
+                               
doReturn(Collections.singletonList(Collections.nCopies(numInitialTaskManagers, 
new AMRMClient.ContainerRequest(Resource.newInstance(1024 * 1024, 1), null, 
null, Priority.newInstance(0)))))
+                                       
.when(resourceManagerClient).getMatchingRequests(any(Priority.class), 
anyString(), any(Resource.class));
+
                                
leaderRetrievalService.notifyListener(leader1.path().toString(), 
leaderSessionID);
 
                                final AkkaActorGateway leader1Gateway = new 
AkkaActorGateway(leader1, leaderSessionID);
@@ -210,6 +195,8 @@ public Object answer(InvocationOnMock invocation) throws 
Throwable {
 
                                resourceManagerGateway.tell(new 
RegisterResourceManagerSuccessful(leader1, Collections.emptyList()));
 
+                               
callbackHandler.onContainersAllocated(containerList);
+
                                for (int i = 0; i < containerList.size(); i++) {
                                        expectMsgClass(deadline.timeLeft(), 
Acknowledge.class);
                                }
@@ -240,8 +227,6 @@ public Object answer(InvocationOnMock invocation) throws 
Throwable {
 
                                int numberOfRegisteredResources = (Integer) 
Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft());
 
-                               verify(resourceManagerClient, 
times(numInitialTaskManagers)).removeContainerRequest(
-                                               
any(AMRMClient.ContainerRequest.class));
                                assertEquals(numInitialTaskManagers, 
numberOfRegisteredResources);
                        } finally {
                                if (resourceManager != null) {
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 8b583306807..85e3d531f3d 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -105,7 +105,9 @@
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -233,7 +235,7 @@ protected void runAsync(final Runnable runnable) {
                final HardwareDescription hardwareDescription = new 
HardwareDescription(1, 2L, 3L, 4L);
 
                // domain objects for test purposes
-               final ResourceProfile resourceProfile1 = new 
ResourceProfile(1.0, 200);
+               final ResourceProfile resourceProfile1 = 
ResourceProfile.UNKNOWN;
 
                public ContainerId task = ContainerId.newInstance(
                                
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1L, 0), 0), 1);
@@ -357,10 +359,12 @@ public void testStopWorker() throws Exception {
                        
when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 
1234));
                        
when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
                        
when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
+
+                       
doReturn(Collections.singletonList(Collections.singletonList(resourceManager.getContainerRequest())))
+                               
.when(mockResourceManagerClient).getMatchingRequests(any(Priority.class), 
anyString(), any(Resource.class));
+
                        
resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
                        
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
-                       
verify(mockResourceManagerClient).removeContainerRequest(
-                                       any(AMRMClient.ContainerRequest.class));
                        
verify(mockNMClient).startContainer(eq(testingContainer), 
any(ContainerLaunchContext.class));
 
                        // Remote task executor registers with 
YarnResourceManager.
@@ -459,6 +463,9 @@ public void testOnContainerCompleted() throws Exception {
                                        1),
                                1);
 
+                       
doReturn(Collections.singletonList(Collections.singletonList(resourceManager.getContainerRequest())))
+                               
.when(mockResourceManagerClient).getMatchingRequests(any(Priority.class), 
anyString(), any(Resource.class));
+
                        // Callback from YARN when container is allocated.
                        Container testingContainer = mock(Container.class);
                        
when(testingContainer.getId()).thenReturn(testContainerId);
@@ -466,9 +473,8 @@ public void testOnContainerCompleted() throws Exception {
                        
when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
                        
when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
                        
resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
+                       
verify(mockResourceManagerClient).removeContainerRequest(any(AMRMClient.ContainerRequest.class));
                        
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
-                       
verify(mockResourceManagerClient).removeContainerRequest(
-                                       any(AMRMClient.ContainerRequest.class));
                        
verify(mockNMClient).startContainer(eq(testingContainer), 
any(ContainerLaunchContext.class));
 
                        // Callback from YARN when container is Completed, 
pending request can not be fulfilled by pending
diff --git a/flink-yarn/src/test/resources/log4j-test.properties 
b/flink-yarn/src/test/resources/log4j-test.properties
index 2226f686531..5b1e4ed8e7a 100644
--- a/flink-yarn/src/test/resources/log4j-test.properties
+++ b/flink-yarn/src/test/resources/log4j-test.properties
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
 
 # A1 is set to be a ConsoleAppender.
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to