Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r149540088 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * General tests for the YARN resource manager component. + */ +public class YarnResourceManagerTest extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(YarnResourceManagerTest.class); + + private static Configuration flinkConfig = new Configuration(); + + private static Map<String, String> env = new HashMap<>(); + + private static final Time timeout = Time.seconds(10L); + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Before + public void setup() { + flinkConfig.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 100); + File root = folder.getRoot(); + File home = new File(root, "home"); + boolean created = home.mkdir(); + assertTrue(created); + + env.put(ENV_APP_ID, "foo"); + env.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath()); + env.put(ENV_CLIENT_SHIP_FILES, ""); + env.put(ENV_FLINK_CLASSPATH, ""); + env.put(ENV_HADOOP_USER_NAME, "foo"); + env.put(FLINK_JAR_PATH, root.toURI().toString()); + } + + @After + public void teardown() { + env.clear(); + } + + static class TestingYarnResourceManager extends YarnResourceManager { + public AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient; + public NMClient mockNMClient; + + public TestingYarnResourceManager( + RpcService rpcService, + String resourceManagerEndpointId, + ResourceID resourceId, + Configuration flinkConfig, + Map<String, String> env, + ResourceManagerConfiguration resourceManagerConfiguration, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + SlotManager slotManager, + MetricRegistry metricRegistry, + JobLeaderIdService jobLeaderIdService, + FatalErrorHandler fatalErrorHandler, + AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient, + NMClient mockNMClient) { + super(rpcService, resourceManagerEndpointId, resourceId, flinkConfig, env, + resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, + slotManager, metricRegistry, jobLeaderIdService, fatalErrorHandler); + this.mockNMClient = mockNMClient; + this.mockResourceManagerClient = mockResourceManagerClient; + } + + public void runInMainThread(Runnable runnable) { + super.getMainThreadExecutor().execute(runnable); + } + + @Override + protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient() { + return mockResourceManagerClient; + } + + @Override + protected NMClient createAndStartNodeManagerClient() { + return mockNMClient; + } + } + + static class Context { + + // services + TestingRpcService rpcService; + TestingFatalErrorHandler fatalErrorHandler; + MockResourceManagerRuntimeServices rmServices; + + // RM + ResourceManagerConfiguration rmConfiguration; + ResourceID rmResourceID; + static final String RM_ADDRESS = "resourceManager"; + TestingYarnResourceManager resourceManager; + + // domain objects for test purposes + final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 200); + + public ContainerId task = ContainerId.newInstance( + ApplicationAttemptId.newInstance(ApplicationId.newInstance(1L, 0), 0), 1); + public String taskHost = "host1"; + + SlotReport slotReport = new SlotReport(); + + public NMClient mockNMClient = mock(NMClient.class); + public AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient = + mock(AMRMClientAsync.class); + + /** + * Create mock RM dependencies. + */ + Context() throws Exception { + rpcService = new TestingRpcService(); + fatalErrorHandler = new TestingFatalErrorHandler(); + rmServices = new MockResourceManagerRuntimeServices(); + + // resource manager + rmConfiguration = new ResourceManagerConfiguration( + Time.seconds(5L), + Time.seconds(5L)); + rmResourceID = ResourceID.generate(); + resourceManager = + new TestingYarnResourceManager( + rpcService, + RM_ADDRESS, + rmResourceID, + flinkConfig, + env, + rmConfiguration, + rmServices.highAvailabilityServices, + rmServices.heartbeatServices, + rmServices.slotManager, + rmServices.metricRegistry, + rmServices.jobLeaderIdService, + fatalErrorHandler, + mockResourceManagerClient, + mockNMClient + ); + } + + /** + * Mock services needed by the resource manager. + */ + class MockResourceManagerRuntimeServices { + + public final ScheduledExecutor scheduledExecutor; + public final TestingHighAvailabilityServices highAvailabilityServices; + public final HeartbeatServices heartbeatServices; + public final MetricRegistry metricRegistry; + public final TestingLeaderElectionService rmLeaderElectionService; + public final JobLeaderIdService jobLeaderIdService; + public final SlotManager slotManager; + + public UUID rmLeaderSessionId; + + MockResourceManagerRuntimeServices() throws Exception { + scheduledExecutor = mock(ScheduledExecutor.class); + highAvailabilityServices = new TestingHighAvailabilityServices(); + rmLeaderElectionService = new TestingLeaderElectionService(); + highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); + heartbeatServices = new TestingHeartbeatServices(5L, 5L, scheduledExecutor); + metricRegistry = mock(MetricRegistry.class); + slotManager = new SlotManager( + new ScheduledExecutorServiceAdapter(new DirectScheduledExecutorService()), + Time.seconds(10), Time.seconds(10), Time.minutes(1)); + jobLeaderIdService = new JobLeaderIdService( + highAvailabilityServices, + rpcService.getScheduledExecutor(), + Time.minutes(5L)); + } + + public void grantLeadership() throws Exception { + rmLeaderSessionId = UUID.randomUUID(); + rmLeaderElectionService.isLeader(rmLeaderSessionId).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } + } + + /** + * Start the resource manager and grant leadership to it. + */ + public void startResourceManager() throws Exception { + resourceManager.start(); + rmServices.grantLeadership(); + } + + /** + * Stop the Akka actor system. + */ + public void stopResourceManager() throws Exception { + rpcService.stopService(); + } + } + + static class TestContainer extends UtilsTest.TestingContainer { + Resource resource; + Priority priority; + + TestContainer(String id, String host) { + super(id, host); + } + + @Override + public Resource getResource() { + return resource; + } + + @Override + public void setResource(Resource resource) { + this.resource = resource; + } + + @Override + public Priority getPriority() { + return priority; + } + + @Override + public void setPriority(Priority priority) { + this.priority = priority; + } + } + + @Test + public void testStopWorker() throws Exception { + new Context() {{ + startResourceManager(); + // Request slot from SlotManager. + resourceManager.runInMainThread(() -> { + try { + rmServices.slotManager.registerSlotRequest( + new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost)); + } catch (SlotManagerException e) { + e.printStackTrace(); + } + }); + + // Callback from YARN when container is allocated. + Container testingContainer = new TestContainer(task.toString(), taskHost); + testingContainer.setResource(Resource.newInstance(200, 1)); + testingContainer.setPriority(Priority.UNDEFINED); + resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); + + // Remote task executor registers with YarnResourceManager. + TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class); --- End diff -- But SimpleAckingTaskExecutorGateway does not implement RpcGateway interface.
---