Hello Team, My scenario is to load the data from producer topic to Hbase by using Spark API. Our cluster is Kerberos authenticated and when we running the below kafkaToHbase.java and the error which I am facing is also below. Let me know if anyone can have any idea what can be done.
package com.spark.example; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.hbase.mapred.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.util.Bytes; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.StorageLevels; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; public class KafkaToHbase { private final static String rawTableName = "kafkatest1"; //final static String user = "a_rakesh.samin...@suddenlink.cequel3.com"; //sfinal static String keyPath = "/home/a_rakesh.samineni/a_rakesh.samineni.keytab"; @SuppressWarnings({ "serial", "deprecation" }) public static void main(String[] args) throws Exception { // // Authenticating Kerberos principal // System.out.println("Principal Authentication: "); // try { // // UserGroupInformation.loginUserFromKeytab(user, keyPath); // } // catch(Exception e){ // e.printStackTrace(); // } //1. Create the spark streaming context with a 10 second batch size SparkConf sparkConf = new SparkConf().setAppName("SparkStreamingTest"); sparkConf.set("spark.driver.allowMultipleContexts", "true"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(10)); int numThreads = Integer.parseInt(args[3]); Map<String, Integer> topicMap = new HashMap<String, Integer>(); String[] topics = args[2].split(","); for (String topic: topics) { topicMap.put(topic, numThreads); } //System.out.println("creating stream.."); JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(ssc, args[0], args[1], topicMap, StorageLevels.MEMORY_AND_DISK_SER); //messages.print(); //System.out.println("messages"+messages); //3. create connection with HBase Configuration config = null; try { System.setProperty("javax.security.auth.useSubjectCredsOnly", "false"); //System.out.println("in hbase configuraiton"); //UserGroupInformation.loginUserFromKeytab(user, keyPath); config = HBaseConfiguration.create(); config.set("hadoop.security.authentication","kerberos"); config.set("hbase.rpc.protection", "privacy"); config.addResource(new org.apache.hadoop.fs.Path("/etc/hbase/conf/core-site.xml")); config.addResource(new org.apache.hadoop.fs.Path("/etc/hbase/conf/hbase-site.xml")); config.set("hbase.zookeeper.quorum", "sdldalplhdm03, sdldalplhdm02, sdldalplhdm01"); config.set("hbase.zookeeper.property.clientPort","2181"); UserGroupInformation.setConfiguration(config); //System.out.println("after hbase configuration"); UserGroupInformation.loginUserFromKeytab(" a_nikhil.gopishe...@suddenlink.cequel3.com", "/home/a_nikhil.gopishetti/a_nikhil.gopishetti.keytab"); //config.set("hbase.master", "10.48.210.248:60010"); HBaseAdmin.checkHBaseAvailable(config); //System.out.println("HBase is running!"); } catch (MasterNotRunningException e) { System.out.println("HBase is not running!"); System.exit(1); }catch (Exception ce){ ce.printStackTrace(); } //config.set(TableInputFormat.INPUT_TABLE, rawTableName); //4. new Hadoop API configuration JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); JavaPairDStream<ImmutableBytesWritable, Put> hbasePuts= lines.mapToPair( new PairFunction<String, ImmutableBytesWritable, Put>(){ @SuppressWarnings("deprecation") public Tuple2<ImmutableBytesWritable, Put> call(String line) { Put put = new Put(Bytes.toBytes(Math.random())); put.add(Bytes.toBytes("cf12"), Bytes.toBytes("firstColumn"), Bytes.toBytes(line)); return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put); } }); final Job newAPIJobConfiguration1 = Job.getInstance(config); hbasePuts.foreachRDD(new Function<JavaPairRDD<ImmutableBytesWritable, Put>, Void>() { public Void call(JavaPairRDD<ImmutableBytesWritable, Put> hbasePutJavaPairRDD) throws Exception { newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, rawTableName); newAPIJobConfiguration1.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.class); hbasePutJavaPairRDD.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration()); //hbasePutJavaPairRDD.saveAsNewAPIHadoopDataset(config); return null; } }); ssc.start(); //ssc.awaitTerminationOrTimeout(10000000); ssc.awaitTermination(); ssc.stop(); } } *Error:* Below are the log lines which is blocking us. 15/11/01 21:22:23 INFO zookeeper.ClientCnxn: Opening socket connection to server sdldalplhdm03.suddenlink.cequel3.com/10.48.210.248:2181. Will not attempt to authenticate using SASL (unknown error) 15/11/01 21:22:23 INFO zookeeper.ClientCnxn: Socket connection established, initiating session, client: /10.48.210.241:52395, server: sdldalplhdm03.suddenlink.cequel3.com/10.48.210.248:2181 15/11/01 21:22:23 INFO zookeeper.ClientCnxn: Session establishment complete on server sdldalplhdm03.suddenlink.cequel3.com/10.48.210.248:2181, sessionid = 0x44fbd94675cf6c7, negotiated timeout = 60000 15/11/01 21:22:24 INFO mapreduce.TableOutputFormat: Created table instance for kafkatest1 15/11/01 21:22:24 WARN security.UserGroupInformation: PriviledgedActionException as:a_nikhil.gopishetti (auth:SIMPLE) cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 15/11/01 21:22:24 WARN ipc.AbstractRpcClient: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] *15/11/01 21:22:24 ERROR ipc.AbstractRpcClient: SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'.* *javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]* at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212) at org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:605) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:154) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:731) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:728) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:728) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:881) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:850) at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1174) at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:216) at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:300) at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.get(ClientProtos.java:31865) at org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRowOrBefore(ProtobufUtil.java:1580) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1294) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1126) at org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:369) at org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:320) at org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:206) at org.apache.hadoop.hbase.client.BufferedMutatorImpl.close(BufferedMutatorImpl.java:158) at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.close(TableOutputFormat.java:111) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1007) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt) at sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147) at sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:121) at sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187) at sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:223) at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212) at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179) at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:193) ... 31 more 15/11/01 21:22:27 WARN security.UserGroupInformation: PriviledgedActionException as:a_nikhil.gopishetti (auth:SIMPLE) cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 15/11/01 21:22:27 WARN ipc.AbstractRpcClient: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 15/11/01 21:22:27 ERROR ipc.AbstractRpcClient: SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'. javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] Regards, Nik.