TisonKun commented on a change in pull request #6953: [FLINK-10558] [yarn-test] 
Port YARNHighAvailabilityITCase to new code…
URL: https://github.com/apache/flink/pull/6953#discussion_r247201616
 
 

 ##########
 File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 ##########
 @@ -18,198 +18,207 @@
 
 package org.apache.flink.yarn;
 
-import org.apache.flink.client.deployment.ClusterSpecification;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.util.YarnTestUtils;
+
+import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.util.Arrays;
+import java.text.NumberFormat;
+import java.util.EnumSet;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
+import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
-import static org.junit.Assume.assumeTrue;
-
 /**
  * Tests that verify correct HA behavior.
  */
 public class YARNHighAvailabilityITCase extends YarnTestBase {
 
-       private static TestingServer zkServer;
+       private static final Logger LOG = 
LoggerFactory.getLogger(YARNHighAvailabilityITCase.class);
 
-       private static ActorSystem actorSystem;
+       @ClassRule
+       public static final TemporaryFolder FOLDER = new TemporaryFolder();
 
-       private static final int numberApplicationAttempts = 3;
+       private static final String LOG_DIR = "flink-yarn-tests-ha";
+       private static final NumberFormat FORMAT = NumberFormat.getInstance();
+       private static final Pattern PATTERN = 
Pattern.compile("(Source|Sink).*switched from DEPLOYING to RUNNING");
+       private static final FiniteDuration TIMEOUT = 
FiniteDuration.apply(200000L, TimeUnit.MILLISECONDS);
 
-       @Rule
-       public TemporaryFolder temp = new TemporaryFolder();
+       private static TestingServer zkServer;
+       private static String storageDir;
+       private static String zkQuorum;
 
        @BeforeClass
        public static void setup() {
-               actorSystem = AkkaUtils.createDefaultActorSystem();
-
                try {
                        zkServer = new TestingServer();
                        zkServer.start();
+
+                       storageDir = FOLDER.newFolder().getAbsolutePath();
+                       zkQuorum = zkServer.getConnectString();
                } catch (Exception e) {
                        e.printStackTrace();
-                       Assert.fail("Could not start ZooKeeper testing 
cluster.");
+                       Assert.fail("Cannot start ZooKeeper Server.");
                }
 
-               YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, 
"flink-yarn-tests-ha");
-               YARN_CONFIGURATION.set(YarnConfiguration.RM_AM_MAX_ATTEMPTS, "" 
+ numberApplicationAttempts);
+               FORMAT.setGroupingUsed(false);
+               FORMAT.setMinimumIntegerDigits(4);
 
+               // startYARNWithConfig should be implemented by subclass
+               YARN_CONFIGURATION.setClass(YarnConfiguration.RM_SCHEDULER, 
CapacityScheduler.class, ResourceScheduler.class);
+               YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, 
LOG_DIR);
+               YARN_CONFIGURATION.setInt(YarnConfiguration.NM_PMEM_MB, 4096);
                startYARNWithConfig(YARN_CONFIGURATION);
        }
 
        @AfterClass
        public static void teardown() throws Exception {
                if (zkServer != null) {
                        zkServer.stop();
+                       zkServer = null;
                }
-
-               JavaTestKit.shutdownActorSystem(actorSystem);
-               actorSystem = null;
        }
 
-       /**
-        * Tests that the application master can be killed multiple times and 
that the surviving
-        * TaskManager successfully reconnects to the newly started JobManager.
-        * @throws Exception
-        */
        @Test
-       public void testMultipleAMKill() throws Exception {
-               assumeTrue("This test only works with the old actor based 
code.", !isNewMode);
-               final int numberKillingAttempts = numberApplicationAttempts - 1;
-               String confDirPath = 
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
-               final Configuration configuration = 
GlobalConfiguration.loadConfiguration();
-               TestingYarnClusterDescriptor flinkYarnClient = new 
TestingYarnClusterDescriptor(
-                       configuration,
-                       getYarnConfiguration(),
-                       confDirPath,
-                       getYarnClient(),
-                       true);
-
-               Assert.assertNotNull("unable to get yarn client", 
flinkYarnClient);
-               flinkYarnClient.setLocalJarPath(new 
Path(flinkUberjar.getAbsolutePath()));
-               
flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
-
-               String fsStateHandlePath = temp.getRoot().getPath();
-
-               // load the configuration
-               File configDirectory = new File(confDirPath);
-               
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
-
-               
flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum="
 +
-                       zkServer.getConnectString() + 
"@@yarn.application-attempts=" + numberApplicationAttempts +
-                       "@@" + CheckpointingOptions.STATE_BACKEND.key() + 
"=FILESYSTEM" +
-                       "@@" + CheckpointingOptions.CHECKPOINTS_DIRECTORY + "=" 
+ fsStateHandlePath + "/checkpoints" +
-                       "@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + 
"=" + fsStateHandlePath + "/recovery");
-
-               ClusterClient<ApplicationId> yarnClusterClient = null;
-
-               final FiniteDuration timeout = new FiniteDuration(2, 
TimeUnit.MINUTES);
-
-               HighAvailabilityServices highAvailabilityServices = null;
-
-               final ClusterSpecification clusterSpecification = new 
ClusterSpecification.ClusterSpecificationBuilder()
-                       .setMasterMemoryMB(768)
-                       .setTaskManagerMemoryMB(1024)
-                       .setNumberTaskManagers(1)
-                       .setSlotsPerTaskManager(1)
-                       .createClusterSpecification();
+       public void testKillJobManager() throws Exception {
+               final Runner clusterRunner = startWithArgs(
+                       new String[]{
+                               "-j", flinkUberjar.getAbsolutePath(),
+                               "-t", flinkLibFolder.getAbsolutePath(),
+                               "-n", "2",
+                               "-jm", "768",
+                               "-tm", "1024",
+                               "-s", "1",
+                               "-nm", "test-cluster",
+                               "-D" + 
TaskManagerOptions.MANAGED_MEMORY_SIZE.key() + "=128",
+                               "-D" + 
YarnConfigOptions.APPLICATION_ATTEMPTS.key() + "=10",
+                               "-D" + HighAvailabilityOptions.HA_MODE.key() + 
"=ZOOKEEPER",
+                               "-D" + 
HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + storageDir,
+                               "-D" + 
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM.key() + "=" + zkQuorum,
+                               "-D" + 
ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY.key() + "=3 s",
+                               "-D" + 
ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS + "=10",
+                               "--detached"
+                       },
+                       "Flink JobManager is now running",
+                       RunTypes.YARN_SESSION);
+
+               // before checking any strings outputted by the CLI, first give 
it time to return
+               clusterRunner.join();
+
+               // actually run a program, otherwise we wouldn't necessarily 
see any TaskManagers
+               // be brought up
+               final File testingJar =
+                       YarnTestBase.findFile("..", new 
YarnTestUtils.TestJarFinder("flink-yarn-tests"));
+               final String job = "org.apache.flink.yarn.testjob.StreamCase";
+
+               Runner jobRunner = startWithArgs(new String[]{"run",
+                       "--detached",
+                       "-c", job,
+                       testingJar.getAbsolutePath(),
+                       "-yD", HighAvailabilityOptions.HA_MODE.key() + 
"=ZOOKEEPER",
+                       "-yD", HighAvailabilityOptions.HA_STORAGE_PATH.key() + 
"=" + storageDir,
+                       "-yD", 
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM.key() + "=" + zkQuorum,
+               }, "Job has been submitted with JobID", RunTypes.CLI_FRONTEND);
+
+               jobRunner.join();
+
+               while (getRunningContainers() < 3) {
+                       sleep(500);
+               }
 
-               try {
-                       yarnClusterClient = 
flinkYarnClient.deploySessionCluster(clusterSpecification);
-
-                       highAvailabilityServices = 
HighAvailabilityServicesUtils.createHighAvailabilityServices(
-                               yarnClusterClient.getFlinkConfiguration(),
-                               Executors.directExecutor(),
-                               
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
-
-                       final HighAvailabilityServices 
finalHighAvailabilityServices = highAvailabilityServices;
-
-                       new JavaTestKit(actorSystem) {{
-                               for (int attempt = 0; attempt < 
numberKillingAttempts; attempt++) {
-                                       new Within(timeout) {
-                                               @Override
-                                               protected void run() {
-                                                       try {
-                                                               ActorGateway 
gateway = LeaderRetrievalUtils.retrieveLeaderGateway(
-                                                                       
finalHighAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
-                                                                       
actorSystem,
-                                                                       
timeout);
-                                                               ActorGateway 
selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID());
-
-                                                               
gateway.tell(new 
TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), 
selfGateway);
-
-                                                               
expectMsgEquals(Acknowledge.get());
-
-                                                               
gateway.tell(PoisonPill.getInstance());
-                                                       } catch (Exception e) {
-                                                               throw new 
AssertionError("Could not complete test.", e);
-                                                       }
-                                               }
-                                       };
+               final YarnClient yarnClient = getYarnClient();
+               Assert.assertNotNull(yarnClient);
+
+               Assert.assertEquals(1, 
yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING)).size());
+               final ApplicationReport report1 = 
yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING)).get(0);
+               Assert.assertEquals(1, 
report1.getCurrentApplicationAttemptId().getAttemptId());
+
+               final ApplicationId id = report1.getApplicationId();
+
+               waitUntilCondition(
+                       () -> {
+                               final File jmLog = findFile("..", (dir, name) ->
+                                       name.contains("jobmanager.log") && 
dir.getAbsolutePath().contains("_01_")
+                                               && 
dir.getAbsolutePath().contains(LOG_DIR)
+                                               && 
dir.getAbsolutePath().contains(FORMAT.format(id.getId())));
+                               if (jmLog != null) {
+                                       final String jmLogText = 
FileUtils.readFileToString(jmLog);
+                                       final Matcher m = 
PATTERN.matcher(jmLogText);
+                                       // match 4 times, all vertices running
+                                       return m.find() && m.find() && m.find() 
&& m.find();
                                }
+                               return false;
+                       }, TIMEOUT.fromNow());
 
-                               new Within(timeout) {
-                                       @Override
-                                       protected void run() {
-                                               try {
-                                                       ActorGateway gateway = 
LeaderRetrievalUtils.retrieveLeaderGateway(
-                                                               
finalHighAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
-                                                               actorSystem,
-                                                               timeout);
-                                                       ActorGateway 
selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID());
-
-                                                       gateway.tell(new 
TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), 
selfGateway);
-
-                                                       
expectMsgEquals(Acknowledge.get());
-                                               } catch (Exception e) {
-                                                       throw new 
AssertionError("Could not complete test.", e);
-                                               }
-                                       }
-                               };
-
-                       }};
-               } finally {
-                       if (yarnClusterClient != null) {
-                               log.info("Shutting down the Flink Yarn 
application.");
-                               yarnClusterClient.shutDownCluster();
-                               yarnClusterClient.shutdown();
-                       }
+               Runtime.getRuntime().exec(new String[]{
+                       "/bin/sh", "-c", "kill $(ps aux | grep -v bash | grep 
jobmanager | grep -v grep | FS=' \\t' awk '{print $2}')"
 
 Review comment:
   Maybe. The intention here is to cause an external(non-flink) failure on jm, 
triggering YARN's restart logic instead of flink internal poststop and restart.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to