[ https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16648131#comment-16648131 ]
ASF GitHub Bot commented on FLINK-10516: ---------------------------------------- yanyan300300 closed pull request #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup URL: https://github.com/apache/flink/pull/6828 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 462682f7e5f..f3dd27b5ef0 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -27,6 +27,7 @@ import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.configuration.WebOptions; +import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; @@ -67,6 +68,7 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.concurrent.Callable; @@ -160,6 +162,13 @@ protected int run(String[] args) { final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties); + // configure the filesystems + try { + FileSystem.initialize(flinkConfig); + } catch (IOException e) { + throw new IOException("Error while configuring the filesystems.", e); + } + File f = new File(currDir, Utils.KEYTAB_FILE_NAME); if (remoteKeytabPrincipal != null && f.exists()) { String keytabPath = f.getAbsolutePath(); diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java index b15374b2a4b..929dbdbce3b 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java @@ -19,9 +19,12 @@ package org.apache.flink.yarn; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.util.OperatingSystem; +import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.Assume; @@ -29,9 +32,14 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +56,7 @@ import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; @@ -56,6 +65,8 @@ /** * Tests for the {@link YarnApplicationMasterRunner}. */ +@PrepareForTest(FileSystem.class) +@RunWith(PowerMockRunner.class) public class YarnApplicationMasterRunnerTest { private static final Logger LOG = LoggerFactory.getLogger(YarnApplicationMasterRunnerTest.class); @@ -109,4 +120,37 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { taskManagerConf, workingDirectory, taskManagerMainClass, LOG); assertEquals("file", ctx.getLocalResources().get("flink.jar").getResource().getScheme()); } + + @Test + public void testRunAndInitializeFileSystem() throws Exception { + // Mock necessary system variables + Map<String, String> map = new HashMap<String, String>(System.getenv()); + map.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, "foo"); + // Create dynamic properties to be used in the Flink configuration + map.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, "myKey=myValue"); + CommonTestUtils.setEnv(map); + + // Create a temporary flink-conf.yaml and to be deleted on JVM exits + File currDir = new File(System.getenv().get(ApplicationConstants.Environment.PWD.key())); + String path = String.format("%s/%s.%s", currDir, "flink-conf", "yaml"); + File f = new File(path); + f.createNewFile(); + f.deleteOnExit(); + + // Mock FileSystem.initialize() + PowerMockito.mockStatic(FileSystem.class); + PowerMockito.doNothing().when(FileSystem.class); + FileSystem.initialize(any(Configuration.class)); + + String[] args = new String[5]; + YarnApplicationMasterRunner yarnApplicationMasterRunner = new YarnApplicationMasterRunner(); + yarnApplicationMasterRunner.run(args); + + // Verify FileSystem.initialize() is invoked with the correct Flink config + ArgumentCaptor<Configuration> propertiesCaptor = + ArgumentCaptor.forClass(Configuration.class); + PowerMockito.verifyStatic(); + FileSystem.initialize(propertiesCaptor.capture()); + assertEquals("myValue", propertiesCaptor.getValue().getString("myKey", "")); + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink > Configuration during setup > ------------------------------------------------------------------------------------------------------- > > Key: FLINK-10516 > URL: https://issues.apache.org/jira/browse/FLINK-10516 > Project: Flink > Issue Type: Bug > Components: YARN > Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0 > Reporter: Shuyi Chen > Assignee: Shuyi Chen > Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to > prevent future regression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)