Sorry, I don't know why the code and error are not visible.
The error is :
The program finished with the following exception:
/org.apache.flink.client.deployment.ClusterDeploymentException: Could not
deploy Yarn job cluster.
at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:239)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
at flink.SubmitDemo.submit(SubmitDemo.java:75)
at flink.SubmitDemo.main(SubmitDemo.java:50)
Caused by:
org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
The YARN application unexpectedly switched to state FAILED during
deployment.
Diagnostics from YARN: Application application_1526888270443_0090 failed 2
times due to AM Container for appattempt_1526888270443_0090_000002 exited
with exitCode: -1000
For more detailed output, check application tracking
page:http://cluster1:8088/cluster/app/application_1526888270443_0090Then,
click on links to logs of each attempt.
Diagnostics: File
file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
does not exist
java.io.FileNotFoundException: File
file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
does not exist
at
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
at
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:358)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Failing this attempt. Failing the application.
If log aggregation is enabled on your cluster, use this command to further
investigate the issue:
yarn logs -applicationId application_1526888270443_0090
at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1059)
at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:532)
at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
... 5 more/
and my code like :
/public class SubmitDemo {
private final String ENV_CONF = "/usr/ndp/current/yarn_client/conf";
private final String FLINK_CONF = "/home/demo/flink-1.5.1/conf";
private static final String JAR_FILE =
"/home/demo/lz/flink1.5_demo-1.16-SNAPSHOT-jar-with-dependencies.jar";
public static void main(String[] args) {
SubmitDemo demo = new SubmitDemo();
demo.before();
List<String> parameters = new ArrayList<>();
parameters.add("run");
parameters.add("-d");
parameters.add("-m");
parameters.add("yarn-cluster");
parameters.add("-ynm");
parameters.add("lz_test_alone");
parameters.add("-yn");
parameters.add("4");
parameters.add("-ytm");
parameters.add("4096");
parameters.add("-yjm");
parameters.add("1024");
parameters.add("-c");
parameters.add("flink.Demo");
parameters.add(JAR_FILE);
try {
demo.submit(parameters.toArray(new String[parameters.size()]));
} catch (Exception e) {
e.printStackTrace();
}
}
public void submit(String[] args) throws Exception {
final String configurationDirectory = ENV_CONF;
File configFIle = new File(FLINK_CONF);
final Configuration flinkConfiguration =
GlobalConfiguration.loadConfiguration(configFIle.getAbsolutePath());
FlinkYarnSessionCli cli = new
FlinkYarnSessionCli(flinkConfiguration, configurationDirectory, "y",
"yarn");
final List<CustomCommandLine<?>> customCommandLines =
CliFrontend.loadCustomCommandLines(
flinkConfiguration,
configurationDirectory);
CliFrontend testFrontend = new CliFrontend(flinkConfiguration,
customCommandLines);
//submit
testFrontend.parseParameters(args);
CommandLine commandLine = CliFrontendParser.parse(
CliFrontendParser.getRunCommandOptions(),
args,
true);
final ApplicationId clusterId = cli.getClusterId(commandLine);
System.out.println("ApplicationId=" + clusterId.toString());
}
// SET HADOOP ENV
private void before() {
Map<String, String> newenv = Maps.newHashMap();
newenv.put("HADOOP_CONF_DIR", ENV_CONF);
newenv.put("YARN_CONF_DIR", ENV_CONF);
try {
Class<?> processEnvironmentClass =
Class.forName("java.lang.ProcessEnvironment");
Field theEnvironmentField =
processEnvironmentClass.getDeclaredField("theEnvironment");
theEnvironmentField.setAccessible(true);
Map<String, String> env = (Map<String, String>)
theEnvironmentField.get(null);
env.putAll(newenv);
Field theCaseInsensitiveEnvironmentField =
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
theCaseInsensitiveEnvironmentField.setAccessible(true);
Map<String, String> cienv = (Map<String, String>)
theCaseInsensitiveEnvironmentField.get(null);
cienv.putAll(newenv);
} catch (NoSuchFieldException e) {
Class[] classes = Collections.class.getDeclaredClasses();
Map<String, String> env = System.getenv();
for (Class cl : classes) {
if
("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
Field field = cl.getDeclaredField("m");
field.setAccessible(true);
Object obj = field.get(env);
Map<String, String> map = (Map<String, String>) obj;
map.clear();
map.putAll(newenv);
}
}
}
}
}/
the error is file not found
"/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
"
but I can foud this file .
Previously, I thought it was an environment variable problem and added "
before() ". This method still reported an error
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/