Eric: Other than HBaseConverters.scala, examples/src/main/python/hbase_inputformat.py was also updated. FYI
On Mon, Aug 10, 2015 at 11:08 AM, Eric Bless <eric.bl...@yahoo.com.invalid> wrote: > Thank you Gen, the changes to HBaseConverters.scala look to now be > returning all column qualifiers, as follows - > > (u'row1', {u'qualifier': u'a', u'timestamp': u'1438716994027', u'value': > u'value1', u'columnFamily': u'f1', u'type': u'Put', u'row': u'row1'}) > (u'row1', {u'qualifier': u'b', u'timestamp': u'1438717004248', u'value': > u'value2', u'columnFamily': u'f1', u'type': u'Put', u'row': u'row1'}) > (u'row2', {u'qualifier': u'', u'timestamp': u'1438717014529', u'value': > u'value3', u'columnFamily': u'f1', u'type': u'Put', u'row': u'row2'}) > (u'row3', {u'qualifier': u'', u'timestamp': u'1438717022756', u'value': > u'value4', u'columnFamily': u'f1', u'type': u'Put', u'row': u'row3'}) > > > Just to be clear, you refer to "Spark update these two scripts > recently.". What two scripts were you referencing? > > > > On Friday, August 7, 2015 7:59 PM, gen tang <gen.tan...@gmail.com> wrote: > > > Hi, > > In fact, Pyspark use > org.apache.spark.examples.pythonconverters(./examples/src/main/scala/org/apache/spark/pythonconverters/) > to transform object of Hbase result to python string. > Spark update these two scripts recently. However, they are not included in > the official release of spark. So you are trying to use this new python > script with old jar. > > You can clone the newest code of spark from github and build examples jar. > Then you can get correct result. > > Cheers > Gen > > > On Sat, Aug 8, 2015 at 5:03 AM, Eric Bless <eric.bl...@yahoo.com.invalid> > wrote: > > I’m having some difficulty getting the desired results from the Spark > Python example hbase_inputformat.py. I’m running with CDH5.4, hbase Version > 1.0.0, Spark v 1.3.0 Using Python version 2.6.6 > > I followed the example to create a test HBase table. Here’s the data from > the table I created – > hbase(main):001:0> scan 'dev_wx_test' > ROW COLUMN+CELL > row1 column=f1:a, timestamp=1438716994027, value=value1 > row1 column=f1:b, timestamp=1438717004248, value=value2 > row2 column=f1:, timestamp=1438717014529, value=value3 > row3 column=f1:, timestamp=1438717022756, value=value4 > 3 row(s) in 0.2620 seconds > > When either of these statements are included - > “hbase_rdd = hbase_rdd.flatMapValues(lambda v: v.split("\n"))” or > “hbase_rdd = hbase_rdd.flatMapValues(lambda v: > v.split("\n")).countByValue().items()” the result is - > We only get the following printed; (row1, value2) is not printed: > ((u'row1', u'value1'), 1) ((u'row2', u'value3'), 1) > ((u'row3', u'value4'), 1) > This looks like similar results to the following post I found - > > http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-get-column-family-and-qualifier-names-from-hbase-table-td18613.html#a18650 > but it appears the pythonconverter HBaseResultToStringConverter has been > updated since then. > > And this problem will be resolved too. > > > > When the statement > “hbase_rdd = hbase_rdd.flatMapValues(lambda v: > v.split("\n")).mapValues(json.loads)” is included, the result is – > ValueError: No JSON object could be decoded > > > ************************************************************************************** > Here is more info on this from the log – > Traceback (most recent call last): > File "hbase_inputformat.py", line 87, in <module> > output = hbase_rdd.collect() > File > "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py", > line 701, in collect > File > "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/java_gateway.py", > line 538, in __call__ > File > "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/protocol.py", > line 300, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling o44.collect. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task > 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage > 1.0 (TID 4, stluhdpddev27.monsanto.com): > org.apache.spark.api.python.PythonException: Traceback (most recent call > last): > File > "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py", > line 101, in main > process() > File > "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py", > line 96, in process > serializer.dump_stream(func(split_index, iterator), outfile) > File > "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/serializers.py", > line 236, in dump_stream > vs = list(itertools.islice(iterator, batch)) > File > "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py", > line 1807, in <lambda> > File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads > return _default_decoder.decode(s) > File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode > obj, end = self.raw_decode(s, idx=_w(s, 0).end()) > File "/usr/lib64/python2.6/json/decoder.py", line 338, in raw_decode > raise ValueError("No JSON object could be decoded") > ValueError: No JSON object could be decoded > > at > org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135) > at > org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:176) > at > org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org > <http://org.apache.spark.scheduler.dagscheduler.org/> > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > > Any suggestions would be most welcome. > > **************************************************************** > Below is the code we’re running. We did add a few things to the original > example in our attempts to get it working. > > from __future__ import print_function > > import sys > import json > > from pyspark import SparkContext > from pyspark.conf import SparkConf > > import os.path > os.environ["SPARK_HOME"] = > "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/" > conf = (SparkConf().setMaster('local').setAppName('a')) > > if __name__ == "__main__": > if len(sys.argv) != 3: > print(""" > Usage: hbase_inputformat <host> <table> > Run with example jar: > ./bin/spark-submit --driver-class-path /path/to/example/jar \ > /path/to/examples/hbase_inputformat.py <host> <table> [<znode>] > Assumes you have some data in HBase already, running on <host>, in > <table> > optionally, you can specify parent znode for your hbase cluster - > <znode> > """, file=sys.stderr) > exit(-1) > > host = sys.argv[1] > table = sys.argv[2] > sc = SparkContext(appName="HBaseInputFormat") > > # Other options for configuring scan behavior are available. More > information available at > # > https://github.com/apache/hbase/blob/master/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java > conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": > table} > if len(sys.argv) > 3: > conf = {"hbase.zookeeper.quorum": host, "zookeeper.znode.parent": > sys.argv[3], > "hbase.mapreduce.inputtable": table} > keyConv = > "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter" > valueConv = > "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter" > > hbase_rdd = sc.newAPIHadoopRDD( > "org.apache.hadoop.hbase.mapreduce.TableInputFormat", > "org.apache.hadoop.hbase.io.ImmutableBytesWritable", > "org.apache.hadoop.hbase.client.Result", > keyConverter=keyConv, > valueConverter=valueConv, > conf=conf) > hbase_rdd = hbase_rdd.flatMapValues(lambda v: > v.split("\n")).mapValues(json.loads) > # hbase_rdd = hbase_rdd.flatMapValues(lambda v: v.split("\n")) > # hbase_rdd = hbase_rdd.flatMapValues(lambda v: > v.split("\n")).countByValue().items() > > output = hbase_rdd.collect() > # output = hbase_rdd > for (k, v) in output: > print((k, v)) > > sc.stop() > > > > >