+Gordon Could you please have a look at this? You probably know Kafka best by now and have also worked on security related stuff for a while now.
I’m afraid I’m not much help here but I’m hoping Gordon can help. Best, Aljoscha > On 21. Apr 2017, at 12:46, Bruno Michelin Rakotondranaivo > <bruno.michelin.rakotondrana...@ericsson.com> wrote: > > Hi, > > With flink-1.2.0, I want to consume datas from secured kafka 0.10 with > SASL_PLAINTEXT protocol using login/pwd from a JAAS file and store them on > HDFS in a kerberized cluster with user ‘hive’ as kerberos principal login. > > Checkpointing is enabled and states are back end on HDFS ‘filesystem’. > > There is an error when the job want to initialize checkpoints. The app uses > JAAS authentication instead of Kerberos one to write on HDFS. > > 15:32:56,300 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1) > (373063aff1529c7e2ba362ad30c8b03c) switched from RUNNING to FAILED. > java.lang.Exception: Error while triggering checkpoint 1 for Source: Custom > Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1) > at > org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1120) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Could not perform checkpoint 1 for operator > Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1). > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:534) > at > org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1111) > ... 5 more > Caused by: java.io.IOException: The given file URI > (hdfs://mynamenode:8020/user/hive/flink-apps/my-app/checkpoints > <hdfs://mynamenode:8020/user/hive/flink-apps/my-app/checkpoints>) points to > the HDFS NameNode at mynamenode:8020, but the File System could not be > initialized with that address. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:334) > at > org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:288) > at > org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310) > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:105) > at > org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:172) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.createStreamFactory(StreamTask.java:1155) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1137) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1076) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:641) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:586) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:529) > ... 6 more > Caused by: java.lang.NullPointerException > at > org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:163) > at > org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:378) > at > org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:183) > at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:568) > at > org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418) > at > org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314) > at > org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176) > at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:668) > at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:604) > at > org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:320) > ... 17 more > > > What I have misunderstand? > How can I use JAAS in/ or with Kerberos? > > Thanks in advance > > MR