+Gordon Could you please have a look at this? You probably know Kafka best by 
now and have also worked on security related stuff for a while now.

I’m afraid I’m not much help here but I’m hoping Gordon can help.

Best,
Aljoscha
> On 21. Apr 2017, at 12:46, Bruno Michelin Rakotondranaivo 
> <bruno.michelin.rakotondrana...@ericsson.com> wrote:
> 
> Hi,
>  
> With flink-1.2.0, I want to consume datas from secured kafka 0.10 with 
> SASL_PLAINTEXT protocol using login/pwd from a JAAS file and store them on 
> HDFS in a kerberized cluster with user ‘hive’ as kerberos principal login.  
>  
> Checkpointing is enabled and states are back end on HDFS ‘filesystem’.
>  
> There is an error when the job want to initialize checkpoints. The app uses 
> JAAS authentication instead of Kerberos one to write on HDFS.
>  
> 15:32:56,300 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1) 
> (373063aff1529c7e2ba362ad30c8b03c) switched from RUNNING to FAILED.
> java.lang.Exception: Error while triggering checkpoint 1 for Source: Custom 
> Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1)
>                 at 
> org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1120)
>                 at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>                 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>                 at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>                 at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>                 at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not perform checkpoint 1 for operator 
> Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1).
>                 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:534)
>                 at 
> org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1111)
>                 ... 5 more
> Caused by: java.io.IOException: The given file URI 
> (hdfs://mynamenode:8020/user/hive/flink-apps/my-app/checkpoints 
> <hdfs://mynamenode:8020/user/hive/flink-apps/my-app/checkpoints>) points to 
> the HDFS NameNode at mynamenode:8020, but the File System could not be 
> initialized with that address.
>                 at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:334)
>                 at 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:288)
>                 at 
> org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
>                 at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
>                 at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:105)
>                 at 
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:172)
>                 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.createStreamFactory(StreamTask.java:1155)
>                 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1137)
>                 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1076)
>                 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:641)
>                 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:586)
>                 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:529)
>                 ... 6 more
> Caused by: java.lang.NullPointerException
>                 at 
> org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:163)
>                 at 
> org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:378)
>                 at 
> org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:183)
>                 at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:568)
>                 at 
> org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
>                 at 
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
>                 at 
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>                 at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:668)
>                 at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:604)
>                 at 
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148)
>                 at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:320)
>                 ... 17 more
>  
>  
> What I have misunderstand? 
> How can I use JAAS in/ or with Kerberos?
>  
> Thanks in advance
>  
> MR

Reply via email to