Since the error is from the HBase client and completely unrelated to Kafka, you will have better luck in the HBase mailing list.
On Mon, Nov 2, 2015 at 9:16 AM, Nikhil Gs <gsnikhil1432...@gmail.com> wrote: > Hello Team, > > My scenario is to load the data from producer topic to Hbase by using Spark > API. Our cluster is Kerberos authenticated and when we running the below > kafkaToHbase.java and the error which I am facing is also below. Let me > know if anyone can have any idea what can be done. > > package com.spark.example; > > import java.util.HashMap; > import java.util.Map; > > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.hbase.HBaseConfiguration; > import org.apache.hadoop.hbase.MasterNotRunningException; > import org.apache.hadoop.hbase.client.HBaseAdmin; > import org.apache.hadoop.hbase.client.Put; > import org.apache.hadoop.hbase.io.ImmutableBytesWritable; > import org.apache.hadoop.mapreduce.Job; > import org.apache.hadoop.security.UserGroupInformation; > import org.apache.hadoop.hbase.mapred.TableOutputFormat; > import org.apache.hadoop.hbase.mapreduce.TableInputFormat; > import org.apache.hadoop.hbase.util.Bytes; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaPairRDD; > import org.apache.spark.api.java.StorageLevels; > import org.apache.spark.api.java.function.Function; > import org.apache.spark.api.java.function.PairFunction; > import org.apache.spark.streaming.Durations; > import org.apache.spark.streaming.api.java.JavaDStream; > import org.apache.spark.streaming.api.java.JavaPairDStream; > import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; > import org.apache.spark.streaming.api.java.JavaStreamingContext; > import org.apache.spark.streaming.kafka.KafkaUtils; > > import scala.Tuple2; > > public class KafkaToHbase { > > private final static String rawTableName = "kafkatest1"; > > //final static String user = "a_rakesh.samin...@suddenlink.cequel3.com > "; > //sfinal static String keyPath = > "/home/a_rakesh.samineni/a_rakesh.samineni.keytab"; > > @SuppressWarnings({ "serial", "deprecation" }) > public static void main(String[] args) throws Exception { > // // Authenticating Kerberos principal > // System.out.println("Principal Authentication: "); > // try { > // > // UserGroupInformation.loginUserFromKeytab(user, keyPath); > // } > // catch(Exception e){ > // e.printStackTrace(); > // } > > > //1. Create the spark streaming context with a 10 second batch size > SparkConf sparkConf = new SparkConf().setAppName("SparkStreamingTest"); > sparkConf.set("spark.driver.allowMultipleContexts", "true"); > JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, > Durations.seconds(10)); > > int numThreads = Integer.parseInt(args[3]); > Map<String, Integer> topicMap = new HashMap<String, Integer>(); > String[] topics = args[2].split(","); > for (String topic: topics) { > topicMap.put(topic, numThreads); > } > > //System.out.println("creating stream.."); > JavaPairReceiverInputDStream<String, String> messages = > KafkaUtils.createStream(ssc, args[0], args[1], topicMap, > StorageLevels.MEMORY_AND_DISK_SER); > //messages.print(); > //System.out.println("messages"+messages); > > //3. create connection with HBase > Configuration config = null; > try { > System.setProperty("javax.security.auth.useSubjectCredsOnly", "false"); > //System.out.println("in hbase configuraiton"); > //UserGroupInformation.loginUserFromKeytab(user, keyPath); > config = HBaseConfiguration.create(); > config.set("hadoop.security.authentication","kerberos"); > config.set("hbase.rpc.protection", "privacy"); > config.addResource(new > org.apache.hadoop.fs.Path("/etc/hbase/conf/core-site.xml")); > config.addResource(new > org.apache.hadoop.fs.Path("/etc/hbase/conf/hbase-site.xml")); > config.set("hbase.zookeeper.quorum", "sdldalplhdm03, sdldalplhdm02, > sdldalplhdm01"); > config.set("hbase.zookeeper.property.clientPort","2181"); > UserGroupInformation.setConfiguration(config); > //System.out.println("after hbase configuration"); > UserGroupInformation.loginUserFromKeytab(" > a_nikhil.gopishe...@suddenlink.cequel3.com", > "/home/a_nikhil.gopishetti/a_nikhil.gopishetti.keytab"); > //config.set("hbase.master", "10.48.210.248:60010"); > HBaseAdmin.checkHBaseAvailable(config); > //System.out.println("HBase is running!"); > } > catch (MasterNotRunningException e) { > System.out.println("HBase is not running!"); > System.exit(1); > }catch (Exception ce){ > ce.printStackTrace(); > } > > //config.set(TableInputFormat.INPUT_TABLE, rawTableName); > > //4. new Hadoop API configuration > JavaDStream<String> lines = messages.map(new Function<Tuple2<String, > String>, String>() { > public String call(Tuple2<String, String> tuple2) { > return tuple2._2(); > } > > }); > JavaPairDStream<ImmutableBytesWritable, Put> hbasePuts= lines.mapToPair( > new PairFunction<String, ImmutableBytesWritable, Put>(){ > > @SuppressWarnings("deprecation") > public Tuple2<ImmutableBytesWritable, Put> call(String line) { > Put put = new Put(Bytes.toBytes(Math.random())); > put.add(Bytes.toBytes("cf12"), Bytes.toBytes("firstColumn"), > Bytes.toBytes(line)); > return new Tuple2<ImmutableBytesWritable, Put>(new > ImmutableBytesWritable(), put); > } > }); > > final Job newAPIJobConfiguration1 = Job.getInstance(config); > hbasePuts.foreachRDD(new Function<JavaPairRDD<ImmutableBytesWritable, Put>, > Void>() { > public Void call(JavaPairRDD<ImmutableBytesWritable, Put> > hbasePutJavaPairRDD) throws Exception { > > > newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, > rawTableName); > > newAPIJobConfiguration1.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.class); > > hbasePutJavaPairRDD.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration()); > //hbasePutJavaPairRDD.saveAsNewAPIHadoopDataset(config); > return null; > } > }); > > ssc.start(); > //ssc.awaitTerminationOrTimeout(10000000); > ssc.awaitTermination(); > ssc.stop(); > } > > } > > *Error:* > > > > Below are the log lines which is blocking us. > > > > 15/11/01 21:22:23 INFO zookeeper.ClientCnxn: Opening socket connection to > server sdldalplhdm03.suddenlink.cequel3.com/10.48.210.248:2181. Will not > attempt to authenticate using SASL (unknown error) > > 15/11/01 21:22:23 INFO zookeeper.ClientCnxn: Socket connection established, > initiating session, client: /10.48.210.241:52395, server: > sdldalplhdm03.suddenlink.cequel3.com/10.48.210.248:2181 > > 15/11/01 21:22:23 INFO zookeeper.ClientCnxn: Session establishment complete > on server sdldalplhdm03.suddenlink.cequel3.com/10.48.210.248:2181, > sessionid = 0x44fbd94675cf6c7, negotiated timeout = 60000 > > 15/11/01 21:22:24 INFO mapreduce.TableOutputFormat: Created table instance > for kafkatest1 > > 15/11/01 21:22:24 WARN security.UserGroupInformation: > PriviledgedActionException as:a_nikhil.gopishetti (auth:SIMPLE) > cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by > GSSException: No valid credentials provided (Mechanism level: Failed to > find any Kerberos tgt)] > > 15/11/01 21:22:24 WARN ipc.AbstractRpcClient: Exception encountered while > connecting to the server : javax.security.sasl.SaslException: GSS initiate > failed [Caused by GSSException: No valid credentials provided (Mechanism > level: Failed to find any Kerberos tgt)] > > *15/11/01 21:22:24 ERROR ipc.AbstractRpcClient: SASL authentication failed. > The most likely cause is missing or invalid credentials. Consider 'kinit'.* > > *javax.security.sasl.SaslException: GSS initiate failed [Caused by > GSSException: No valid credentials provided (Mechanism level: Failed to > find any Kerberos tgt)]* > > at > > com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212) > > at > > org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179) > > at > > org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:605) > > at > > org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:154) > > at > > org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:731) > > at > > org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:728) > > at java.security.AccessController.doPrivileged(Native > Method) > > at javax.security.auth.Subject.doAs(Subject.java:415) > > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) > > at > > org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:728) > > at > > org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:881) > > at > > org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:850) > > at > org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1174) > > at > > org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:216) > > at > > org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:300) > > at > > org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.get(ClientProtos.java:31865) > > at > > org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRowOrBefore(ProtobufUtil.java:1580) > > at > > org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1294) > > at > > org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1126) > > at > org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:369) > > at > org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:320) > > at > > org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:206) > > at > > org.apache.hadoop.hbase.client.BufferedMutatorImpl.close(BufferedMutatorImpl.java:158) > > at > > org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.close(TableOutputFormat.java:111) > > at > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1007) > > at > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979) > > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > > at org.apache.spark.scheduler.Task.run(Task.scala:64) > > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: GSSException: No valid credentials provided (Mechanism level: > Failed to find any Kerberos tgt) > > at > > sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147) > > at > > sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:121) > > at > > sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187) > > at > > sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:223) > > at > sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212) > > at > sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179) > > at > > com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:193) > > ... 31 more > > 15/11/01 21:22:27 WARN security.UserGroupInformation: > PriviledgedActionException as:a_nikhil.gopishetti (auth:SIMPLE) > cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by > GSSException: No valid credentials provided (Mechanism level: Failed to > find any Kerberos tgt)] > > 15/11/01 21:22:27 WARN ipc.AbstractRpcClient: Exception encountered while > connecting to the server : javax.security.sasl.SaslException: GSS initiate > failed [Caused by GSSException: No valid credentials provided (Mechanism > level: Failed to find any Kerberos tgt)] > > 15/11/01 21:22:27 ERROR ipc.AbstractRpcClient: SASL authentication failed. > The most likely cause is missing or invalid credentials. Consider 'kinit'. > > javax.security.sasl.SaslException: GSS initiate failed [Caused by > GSSException: No valid credentials provided (Mechanism level: Failed to > find any Kerberos tgt)] > > > Regards, > Nik. >