Hi guys,
Good day.
I rebuilt flink from the source and specified the vendor specific
Hadoop version. It works well when i just submit a streaming
application without '-d'(--detached) option as follows:
bin/flink run -m yarn-cluster -yqu root.streaming -yn 5 -yjm 2048 -ytm
3096 -ynm CJVFormatter -ys 2 -c yidian.data.cjv.Formatter
./cjv-formatter-1.0-SNAPSHOT.jar --conf ./formatter.conf
But if i add the '-d'(--detached) option, a
'*org.apache.flink.client.deployment.ClusterDeploymentException*' will
be thrown out to the CLI. Just as:
bin/flink run */-d/* -m yarn-cluster -yqu root.streaming -yn 5 -yjm
2048 -ytm 3096 -ynm CJVFormatter -ys 2 -c yidian.data.cjv.Formatter
./cjv-formatter-1.0-SNAPSHOT.jar --conf ./formatter.conf
*--------------------------------Exception
start--------------------------------------------------------------------------------------------------------------------------------------------*
The program finished with the following exception:
org.apache.flink.client.deployment.ClusterDeploymentException: Could
not deploy Yarn job cluster.
at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:238)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$16(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by:
org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
The YARN application unexpectedly switched to state FAILED during
deployment.
Diagnostics from YARN: Application application_1544777537685_0068
failed 2 times due to AM Container for
appattempt_1544777537685_0068_000002 exited with exitCode: 1
For more detailed output, check application tracking
page:http://103-8-200-sh-100-F07.yidian.com:8088/proxy/application_1544777537685_0068/Then,
click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_e03_1544777537685_0068_02_000001
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:543)
at org.apache.hadoop.util.Shell.run(Shell.java:460)
at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:720)
at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:210)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Container exited with a non-zero exit code 1
Failing this attempt. Failing the application.
If log aggregation is enabled on your cluster, use this command to
further investigate the issue:
yarn logs -applicationId application_1544777537685_0068
at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1065)
at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)
at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
... 9 more
2019-01-07 17:08:55,463 INFO
org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cancelling
deployment from Deployment Failure Hook
2019-01-07 17:08:55,464 INFO
org.apache.flink.yarn.AbstractYarnClusterDescriptor - Killing YARN
application
2019-01-07 17:08:55,471 INFO
org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deleting
files in
*-------------------------------------------End----------------------End----------------------------End-------------------------------------------------*
My cluster has enable the log aggregation, so I executed the following
command:
yarn logs -applicationId application_1544777537685_0068, the detail
about the log shows that
*---------------------------------------------Exception
start----------------------Exception
start-----------------------------------------------------------------------------*
The detail information show that:
2019-01-07 17:08:49,385 INFO akka.remote.Remoting - Remoting
started; listening on addresses
:[akka.tcp://fl...@120-14-20-sh-1037-b06.yidian.com:20998]
2019-01-07 17:08:49,391 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Actor
system started at akka.tcp://fl...@120-14-20-sh-1037-b06.yidian.com:20998
2019-01-07 17:08:51,108 INFO
org.apache.flink.runtime.blob.FileSystemBlobStore - Creating highly
available BLOB storage directory at
hdfs://amethyst/flink/flink17/ha//application_1544777537685_0068/blob
2019-01-07 17:08:51,138 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Shutting
YarnJobClusterEntrypoint down with application status FAILED.
Diagnostics java.lang.ClassCastException:
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$MkdirsRequestProto
cannot be cast to com.google.protobuf.Message
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy9.mkdirs(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy10.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:3075)
at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:3042)
at
org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:956)
at
org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:952)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:952)
at
org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:945)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170)
at
org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:61)
at
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:126)
at
org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:121)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:304)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:261)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$58(ClusterEntrypoint.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:162)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:517)
at
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)
.
2019-01-07 17:08:51,142 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopping Akka
RPC service.
2019-01-07 17:08:51,149 INFO
akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting
down remote daemon.
2019-01-07 17:08:51,151 INFO
akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote
daemon shut down; proceeding with flushing remote transports.
2019-01-07 17:08:51,170 INFO
akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting
shut down.
2019-01-07 17:08:51,196 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka
RPC service.
2019-01-07 17:08:51,196 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Could not
start cluster entrypoint YarnJobClusterEntrypoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed
to initialize the cluster entrypoint YarnJobClusterEntrypoint.
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:181)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:517)
at
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)
Caused by: java.lang.ClassCastException:
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$MkdirsRequestProto
cannot be cast to com.google.protobuf.Message
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy9.mkdirs(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy10.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:3075)
at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:3042)
at
org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:956)
at
org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:952)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:952)
at
org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:945)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170)
at
org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:61)
at
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:126)
at
org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:121)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:304)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:261)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$58(ClusterEntrypoint.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:162)
... 2 more
*---------------------------------------------------------End----------------------End----------------------------End-------------------------------------------------*
When I rebuild the flink from source, I used the command:
mvn clean install -DskipTests -Dhadoop.version=2.6.0-cdh5.5.0
My cluster details:
Hadoop 2.6.0-cdh5.5.0
Subversion http://github.com/cloudera/hadoop -r
fd21232cef7b8c1f536965897ce20f50b83ee7b2
Compiled by jenkins on 2015-11-09T20:39Z
Compiled with protoc 2.5.0
From source with checksum 98e07176d1787150a6a9c087627562c
This command was run using
/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/jars/hadoop-common-2.6.0-cdh5.5.0.jar
I am wondering that I have specified the vendor specific Hadoop
version, how did this happened? Or is there anything or extra
parameters should be added?
It will be predicated if anybody could help me to figure it out.
Thank you
Best Regards