Hello,

My application handles as input and output some HDFS files in the jobs and in 
the driver application.
It works in local cluster mode, but when I’m trying to submit it to a yarn 
client, when I try to use a HadoopInputFormat (that comes from a HCatalog 
request), I have the following error: Delegation Token can be issued only with 
kerberos or web authentication (full stack trace below).

Code which I believe causes the error (It’s not clear in the stack trace, as 
the nearest point in my code is “execEnv.execute()”) :

public synchronized DataSet<T> readTable(String dbName, String tableName, 
String filter, ExecutionEnvironment cluster,
        final HiveBeanFactory<T> factory) throws IOException {

        // login kerberos if needed (via 
UserGroupInformation.loginUserFromKeytab(getKerberosPrincipal(), 
getKerberosKeytab());)
        HdfsTools.getFileSystem();

        // Create M/R job and configure it
        final Job job = Job.getInstance();
        job.setJobName("Flink source for Hive Table " + dbName + "." + 
tableName);

        // Crée la source
        @SuppressWarnings({ "unchecked", "rawtypes" })
        final HadoopInputFormat<NullWritable, DefaultHCatRecord> inputFormat = 
new HadoopInputFormat<NullWritable, // CHECKSTYLE:OFF
        DefaultHCatRecord>(// CHECKSTYLE:ON
            (InputFormat) HCatInputFormat.setInput(job, dbName, tableName, 
filter), //
            NullWritable.class, //
            DefaultHCatRecord.class, //
            job);

        final HCatSchema inputSchema = 
HCatInputFormat.getTableSchema(job.getConfiguration());
        @SuppressWarnings("serial")
        final DataSet<T> dataSet = cluster
        // Read the table
            .createInput(inputFormat)
            // map bean (key is useless)
            .flatMap(new FlatMapFunction<Tuple2<NullWritable, 
DefaultHCatRecord>, T>() {
                @Override
                public void flatMap(Tuple2<NullWritable, DefaultHCatRecord> 
value, Collector<T> out) throws Exception {  // NOPMD
                    final T record = factory.fromHive(value.f1, inputSchema);
                    if (record != null) {
                        out.collect(record);
                    }
                }
            }).returns(beanClass);

        return dataSet;
    }

Maybe I need to explicitely get a token on each node in the initialization of 
HadoopInputFormat() (overriding configure()) ? That would be difficult since 
the keyfile is on the driver’s local drive…

StackTrace :

Found YARN properties file /usr/lib/flink/bin/../conf/.yarn-properties
Using JobManager address from YARN properties 
bt1svlmw.bpa.bouyguestelecom.fr/172.19.115.52:50494
Secure Hadoop environment setup detected. Running in secure context.
2015:08:20 15:04:17 (main) - INFO - 
com.bouygtel.kuberasdk.main.Application.mainMethod - Dï¿œbut traitement
15:04:18,005 INFO  org.apache.hadoop.security.UserGroupInformation              
 - Login successful for user alinz using keytab file 
/usr/users/alinz/alinz.keytab
15:04:20,139 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory      
 - The short-circuit local reads feature cannot be used because libhadoop 
cannot be loaded.
Error : Execution Kubera KO : java.lang.IllegalStateException: Error while 
executing Flink application
com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:84)
com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68)
com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51)
com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81)
com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
org.apache.flink.client.program.Client.run(Client.java:315)
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873)
org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870)
org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50)
java.security.AccessController.doPrivileged(Native Method)
javax.security.auth.Subject.doAs(Subject.java:415)
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47)
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870)
org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)

Caused by: org.apache.flink.client.program.ProgramInvocationException: The 
program execution failed: Failed to submit job dddaf104260eb0f56ff336727ceeb49e 
(KUBERA-GEO-BRUT2SEGMENT)
org.apache.flink.client.program.Client.run(Client.java:413)
org.apache.flink.client.program.Client.run(Client.java:356)
org.apache.flink.client.program.Client.run(Client.java:349)
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:80)
com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68)
com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51)
com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81)
com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
org.apache.flink.client.program.Client.run(Client.java:315)
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873)
org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870)
org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50)
java.security.AccessController.doPrivileged(Native Method)
javax.security.auth.Subject.doAs(Subject.java:415)
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47)
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870)
org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to 
submit job dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT)
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:594)
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
akka.actor.Actor$class.aroundReceive(Actor.scala:465)
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
akka.actor.ActorCell.invoke(ActorCell.scala:487)
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
akka.dispatch.Mailbox.run(Mailbox.scala:221)
akka.dispatch.Mailbox.exec(Mailbox.scala:231)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.flink.runtime.JobException: Creating the input splits 
caused an error: Delegation Token can be issued only with kerberos or web 
authentication
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977)
        at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)

org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
akka.actor.Actor$class.aroundReceive(Actor.scala:465)
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
akka.actor.ActorCell.invoke(ActorCell.scala:487)
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
akka.dispatch.Mailbox.run(Mailbox.scala:221)
akka.dispatch.Mailbox.exec(Mailbox.scala:231)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): 
Delegation Token can be issued only with kerberos or web authentication
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977)
        at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)

org.apache.hadoop.ipc.Client.call(Client.java:1468)
org.apache.hadoop.ipc.Client.call(Client.java:1399)
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
com.sun.proxy.$Proxy14.getDelegationToken(Unknown Source)
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:909)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
com.sun.proxy.$Proxy15.getDelegationToken(Unknown Source)
org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1029)
org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1355)
org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:529)
org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:507)
org.apache.hadoop.hdfs.DistributedFileSystem.addDelegationTokens(DistributedFileSystem.java:2041)
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:121)
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:205)
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
org.apache.hive.hcatalog.mapreduce.HCatBaseInputFormat.getSplits(HCatBaseInputFormat.java:157)
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:140)
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:51)
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
akka.actor.Actor$class.aroundReceive(Actor.scala:465)
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
akka.actor.ActorCell.invoke(ActorCell.scala:487)
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
akka.dispatch.Mailbox.run(Mailbox.scala:221)
akka.dispatch.Mailbox.exec(Mailbox.scala:231)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



Do you have any clue?

Best regards,
Arnaud



________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.

Reply via email to