I doubt anyone would deploy hbase 0.98.x on hadoop-1 Looks like hadoop2 profile should be made the default.
Cheers On Tue, Jan 6, 2015 at 9:49 AM, Max Xu <max...@twosigma.com> wrote: > Awesome. Thanks again Ted. I remember there is a block in the pom.xml > under the example folder that default hbase version to hadoop1. I figured > out this last time when I built Spark 1.1.1 but forgot this time. > > > > <profile> > > <id>hbase-hadoop1</id> > > <activation> > > <property> > > <name>!hbase.profile</name> > > </property> > > </activation> > > <properties> > > <hbase.version>0.98.7-hadoop1</hbase.version> > > </properties> > > </profile> > > > > *From:* Ted Yu [mailto:yuzhih...@gmail.com] > *Sent:* Tuesday, January 06, 2015 12:39 PM > *To:* Max Xu > *Cc:* user@spark.apache.org > *Subject:* Re: Saving data to Hbase hung in Spark streaming application > with Spark 1.2.0 > > > > Default profile is hbase-hadoop1 so you need to specify > -Dhbase.profile=hadoop2 > > > > See SPARK-1297 > > > > Cheers > > > > On Tue, Jan 6, 2015 at 9:11 AM, Max Xu <max...@twosigma.com> wrote: > > Thanks Ted. You are right, hbase-site.xml is in the classpath. But > previously I have it in the classpath too and the app works fine. I believe > I found the problem. I built Spark 1.2.0 myself and forgot to change the > dependency hbase version to 0.98.8-hadoop2, which is the version I use. > When I use spark-examples-1.1.1-hadoop2.5.2.jar from Spark 1.1.1 build > (build with hbase 0.98.8-hadoop2 ), the problem went away. I’ll try to run > the app again after rebuild Spark 1.2.0 with 0.98.8-hadoop2. > > > > *From:* Ted Yu [mailto:yuzhih...@gmail.com] > *Sent:* Tuesday, January 06, 2015 11:56 AM > *To:* Max Xu > *Cc:* user@spark.apache.org > *Subject:* Re: Saving data to Hbase hung in Spark streaming application > with Spark 1.2.0 > > > > I assume hbase-site.xml is in the classpath. > > > > Can you try the code snippet in standalone program to see if the problem > persists ? > > > > Cheers > > > > On Tue, Jan 6, 2015 at 6:42 AM, Max Xu <max...@twosigma.com> wrote: > > Hi all, > > > > I have a Spark streaming application that ingests data from a Kafka topic > and persists received data to Hbase. It works fine with Spark 1.1.1 in YARN > cluster mode. Basically, I use the following code to persist each partition > of each RDD to Hbase: > > > > @Override > > void call(Iterator<Metric> it) throws Exception { > > HConnection hConnection = null; > > HTableInterface htable = null; > > try { > > hConnection = > HConnectionManager.createConnection(_conf.value()); > > htable = hConnection.getTable(_tablePrefix + "_" + new > SimpleDateFormat("yyyy_MM_dd").format(new Date())); > > htable.setAutoFlush(false, true); > > while (it.hasNext()) { > > Metric metric = it.next(); > > htable.put(_put.call(metric)); > > } > > htable.flushCommits(); > > }finally{ > > try { > > if (htable != null) { > > htable.close(); > > } > > } catch (Exception e) { > > System.err.println("error closing htable"); > > System.err.println(e.toString()); > > } > > try { > > if (hConnection != null) { > > hConnection.close(); > > } > > } catch (Exception e) { > > System.err.println("error closing hConnection"); > > System.err.println(e.toString()); > > } > > } > > } > > > > I use Kafka receiver to create input stream. > > KafkaUtils.createStream(jssc, zkQuorum, group, topicMap, > StorageLevel.MEMORY_AND_DISK_SER()); > > > > With 1.2.0, receiving from Kafka still works normally. I tried both > KafkaReceiver and ReliableKafkaReceiver, both can get data from Kafka > without a problem. However, the application just didn’t save data to Hbase. > The streaming page of Spark API showed it stuck at processing the first > batch. > > > > The Executor threads stayed in TIMED_WAITING state: > > Thread 54: Executor task launch worker-0 (TIMED_WAITING) > > java.lang.Thread.sleep(Native Method) > > > org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1296) > > > org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1090) > > > org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1047) > > > org.apache.hadoop.hbase.client.AsyncProcess.findDestLocation(AsyncProcess.java:365) > > org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:310) > > > org.apache.hadoop.hbase.client.HTable.backgroundFlushCommits(HTable.java:971) > > org.apache.hadoop.hbase.client.HTable.doPut(HTable.java:954) > > org.apache.hadoop.hbase.client.HTable.put(HTable.java:915) > > > com.xxx.spark.streaming.JavaKafkaSparkHbase$WriteFunction.persist(JavaKafkaSparkHbase.java:125) > > com.xxx.spark.streaming.PersistFunction$1.call(PersistFunction.java:42) > > com.xxx.spark.streaming.PersistFunction$1.call(PersistFunction.java:35) > > > org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195) > > > org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195) > > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773) > > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773) > > > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) > > > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) > > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > > org.apache.spark.scheduler.Task.run(Task.scala:56) > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > java.lang.Thread.run(Thread.java:745) > > > > KafkaMessageHandler thread is in WAITING state > > Thread 70: KafkaMessageHandler-0 (WAITING) > > sun.misc.Unsafe.park(Native Method) > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) > > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) > > kafka.consumer.ConsumerIterator.makeNext(Unknown Source) > > kafka.consumer.ConsumerIterator.makeNext(Unknown Source) > > kafka.utils.IteratorTemplate.maybeComputeNext(Unknown Source) > > kafka.utils.IteratorTemplate.hasNext(Unknown Source) > > > org.apache.spark.streaming.kafka.KafkaReceiver$MessageHandler.run(KafkaInputDStream.scala:132) > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > > java.util.concurrent.FutureTask.run(FutureTask.java:262) > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > java.lang.Thread.run(Thread.java:745) > > > > Do anybody have similar issues or know how to solve this? I am using > Hadoop 2.5.2 with Hbase 0.98.8. > > > > Thanks very much, > > Max > > > > >