Hi Wei, Did you build Flink with maven 3.2.5 as recommended in the documentation [1]? Also, did you use the -Pvendor-repos flag to add the cloudera repository when building?
Best, Gary [1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/flinkDev/building.html#build-flink [2] https://ci.apache.org/projects/flink/flink-docs-release-1.7/flinkDev/building.html#vendor-specific-versions On Tue, Jan 8, 2019 at 5:17 AM Wei Sun <rdn...@w3sun.com> wrote: > Hi,Timo > > Good day! > > Thank you for your help! This issue has been solved with the rebuilt flink > version. But I found that does not work with the > 'Apache Flink 1.7.1 only' version even if i configure the class path like > export > HADOOP_CLASSPATH=`hadoop classpath` . I will check it later. > Thanks again. > > Best Regards > Wei > > ------------------ Original ------------------ > *From: * "Timo Walther";<twal...@apache.org>; > *Date: * Jan 8, 2019 > *To: * "user"<user@flink.apache.org>; > *Cc: * "gary"<g...@apache.org>; > *Subject: * Re: Building Flink from source according to vendor-specific > versionbut causes protobuf conflict > > Hi Wei, > > did you play around with classloading options mentioned here [1]. The -d > option might impact how classes are loaded when the job is deployed on the > cluster. > > I will loop in Gary that might now more about the YARN behavior. > > Regards, > Timo > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/yarn_setup.html#user-jars--classpath > > > Am 07.01.19 um 10:33 schrieb Wei Sun: > > Hi guys, > > Good day. > > I rebuilt flink from the source and specified the vendor specific Hadoop > version. It works well when i just submit a streaming application without > '-d'(--detached) option as follows: > bin/flink run -m yarn-cluster -yqu root.streaming -yn 5 -yjm 2048 -ytm > 3096 -ynm CJVFormatter -ys 2 -c yidian.data.cjv.Formatter > ./cjv-formatter-1.0-SNAPSHOT.jar --conf ./formatter.conf > > But if i add the '-d'(--detached) option, a ' > *org.apache.flink.client.deployment.ClusterDeploymentException*' will be > thrown out to the CLI. Just as: > bin/flink run *-d* -m yarn-cluster -yqu root.streaming -yn 5 -yjm 2048 > -ytm 3096 -ynm CJVFormatter -ys 2 -c yidian.data.cjv.Formatter > ./cjv-formatter-1.0-SNAPSHOT.jar --conf ./formatter.conf > > *--------------------------------Exception > start--------------------------------------------------------------------------------------------------------------------------------------------* > The program finished with the following exception: > org.apache.flink.client.deployment.ClusterDeploymentException: Could not > deploy Yarn job cluster. > at > org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82) > at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:238) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$16(CliFrontend.java:1126) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > Caused by: > org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException: > The YARN application unexpectedly switched to state FAILED during > deployment. > Diagnostics from YARN: Application application_1544777537685_0068 failed 2 > times due to AM Container for appattempt_1544777537685_0068_000002 exited > with exitCode: 1 > For more detailed output, check application tracking page: > http://103-8-200-sh-100-F07.yidian.com:8088/proxy/application_1544777537685_0068/Then, > click on links to logs of each attempt. > Diagnostics: Exception from container-launch. > Container id: container_e03_1544777537685_0068_02_000001 > Exit code: 1 > Stack trace: ExitCodeException exitCode=1: > at org.apache.hadoop.util.Shell.runCommand(Shell.java:543) > at org.apache.hadoop.util.Shell.run(Shell.java:460) > at > org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:720) > at > org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:210) > at > org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) > at > org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Container exited with a non-zero exit code 1 > Failing this attempt. Failing the application. > If log aggregation is enabled on your cluster, use this command to further > investigate the issue: > yarn logs -applicationId application_1544777537685_0068 > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1065) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545) > at > org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75) > ... 9 more > 2019-01-07 17:08:55,463 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cancelling > deployment from Deployment Failure Hook > 2019-01-07 17:08:55,464 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Killing > YARN application > 2019-01-07 17:08:55,471 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deleting > files in > > *-------------------------------------------End----------------------End----------------------------End-------------------------------------------------* > > My cluster has enable the log aggregation, so I executed the following > command: > yarn logs -applicationId application_1544777537685_0068, the detail about > the log shows that > *---------------------------------------------Exception > start----------------------Exception > start-----------------------------------------------------------------------------* > The detail information show that: > 2019-01-07 17:08:49,385 INFO akka.remote.Remoting > - Remoting started; listening on addresses :[ > akka.tcp://fl...@120-14-20-sh-1037-b06.yidian.com:20998] > 2019-01-07 17:08:49,391 INFO > org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Actor > system started at akka.tcp://fl...@120-14-20-sh-1037-b06.yidian.com:20998 > 2019-01-07 17:08:51,108 INFO > org.apache.flink.runtime.blob.FileSystemBlobStore - Creating > highly available BLOB storage directory at > hdfs://amethyst/flink/flink17/ha//application_1544777537685_0068/blob > 2019-01-07 17:08:51,138 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Shutting > YarnJobClusterEntrypoint down with application status FAILED. Diagnostics > java.lang.ClassCastException: > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$MkdirsRequestProto > cannot be cast to com.google.protobuf.Message > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) > at com.sun.proxy.$Proxy9.mkdirs(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy10.mkdirs(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:3075) > at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:3042) > at > org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:956) > at > org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:952) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:952) > at > org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:945) > at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170) > at > org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:61) > at > org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:126) > at > org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92) > at > org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:121) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:304) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:261) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$58(ClusterEntrypoint.java:163) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:162) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:517) > at > org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102) > . > 2019-01-07 17:08:51,142 INFO > org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopping > Akka RPC service. > 2019-01-07 17:08:51,149 INFO > akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting > down remote daemon. > 2019-01-07 17:08:51,151 INFO > akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote > daemon shut down; proceeding with flushing remote transports. > 2019-01-07 17:08:51,170 INFO > akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting > shut down. > 2019-01-07 17:08:51,196 INFO > org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped > Akka RPC service. > 2019-01-07 17:08:51,196 ERROR > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Could not > start cluster entrypoint YarnJobClusterEntrypoint. > org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to > initialize the cluster entrypoint YarnJobClusterEntrypoint. > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:181) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:517) > at > org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102) > Caused by: java.lang.ClassCastException: > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$MkdirsRequestProto > cannot be cast to com.google.protobuf.Message > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) > at com.sun.proxy.$Proxy9.mkdirs(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy10.mkdirs(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:3075) > at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:3042) > at > org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:956) > at > org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:952) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:952) > at > org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:945) > at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170) > at > org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:61) > at > org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:126) > at > org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92) > at > org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:121) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:304) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:261) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$58(ClusterEntrypoint.java:163) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:162) > ... 2 more > > > *---------------------------------------------------------End----------------------End----------------------------End-------------------------------------------------* > > When I rebuild the flink from source, I used the command: > mvn clean install -DskipTests -Dhadoop.version=2.6.0-cdh5.5.0 > > My cluster details: > Hadoop 2.6.0-cdh5.5.0 > Subversion http://github.com/cloudera/hadoop -r > fd21232cef7b8c1f536965897ce20f50b83ee7b2 > Compiled by jenkins on 2015-11-09T20:39Z > Compiled with protoc 2.5.0 > From source with checksum 98e07176d1787150a6a9c087627562c > This command was run using > /opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/jars/hadoop-common-2.6.0-cdh5.5.0.jar > > I am wondering that I have specified the vendor specific Hadoop version, > how did this happened? Or is there anything or extra parameters should be > added? > > It will be predicated if anybody could help me to figure it out. > > > Thank you > Best Regards > > >