Thank you Gen, the changes to HBaseConverters.scala look to now be returning 
all column qualifiers, as follows - 

(u'row1', {u'qualifier': u'a', u'timestamp': u'1438716994027', u'value': 
u'value1', u'columnFamily': u'f1', u'type': u'Put', u'row': u'row1'})
(u'row1', {u'qualifier': u'b', u'timestamp': u'1438717004248', u'value': 
u'value2', u'columnFamily': u'f1', u'type': u'Put', u'row': u'row1'})
(u'row2', {u'qualifier': u'', u'timestamp': u'1438717014529', u'value': 
u'value3', u'columnFamily': u'f1', u'type': u'Put', u'row': u'row2'})
(u'row3', {u'qualifier': u'', u'timestamp': u'1438717022756', u'value': 
u'value4', u'columnFamily': u'f1', u'type': u'Put', u'row': u'row3'})
Just to be clear, you refer to "Spark update these two scripts recently.". What 
two scripts were you referencing? 


     On Friday, August 7, 2015 7:59 PM, gen tang <gen.tan...@gmail.com> wrote:
   

 Hi,
In fact, Pyspark use 
org.apache.spark.examples.pythonconverters(./examples/src/main/scala/org/apache/spark/pythonconverters/)
 to transform object of Hbase result to python string.Spark update these two 
scripts recently. However, they are not included in the official release of 
spark. So you are trying to use this new python script with old jar. 
You can clone the newest code of spark from github and build examples jar. Then 
you can get correct result.
CheersGen

On Sat, Aug 8, 2015 at 5:03 AM, Eric Bless <eric.bl...@yahoo.com.invalid> wrote:

I’m having some difficulty getting the desired results fromthe Spark Python 
example hbase_inputformat.py. I’m running with CDH5.4, hbaseVersion 1.0.0, 
Spark v 1.3.0 Using Python version 2.6.6 I followed the example to create a 
test HBase table. Here’sthe data from the table I created – hbase(main):001:0> 
scan 'dev_wx_test'ROW                      COLUMN+CELLrow1                    
column=f1:a, timestamp=1438716994027, value=value1row1                    
column=f1:b, timestamp=1438717004248, value=value2row2                    
column=f1:, timestamp=1438717014529, value=value3row3                    
column=f1:, timestamp=1438717022756, value=value43 row(s) in 0.2620 seconds 
When either of these statements are included -“hbase_rdd = 
hbase_rdd.flatMapValues(lambda v:v.split("\n"))”  or “hbase_rdd = 
hbase_rdd.flatMapValues(lambda v:v.split("\n")).countByValue().items()” the 
result is - We only get the following printed; (row1, value2) is notprinted: 
        ((u'row1', u'value1'), 1)        ((u'row2', u'value3'), 1)        
((u'row3', u'value4'), 1) This looks like similar results to the following post 
Ifound 
-http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-get-column-family-and-qualifier-names-from-hbase-table-td18613.html#a18650but
 it appears the pythonconverterHBaseResultToStringConverter has been updated 
since then.
And this problem will be resolved too. 
 When the statement “hbase_rdd = hbase_rdd.flatMapValues(lambda 
v:v.split("\n")).mapValues(json.loads)” is included, the result is – 
ValueError: No JSON object could be decoded 
**************************************************************************************
 Here is more info on this from the log – Traceback (most recent call last):  
File"hbase_inputformat.py", line 87, in <module>    output =hbase_rdd.collect() 
 
File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py",line
 701, in collect  
File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/java_gateway.py",line
 538, in __call__  File 
"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/protocol.py",line
 300, in get_return_valuepy4j.protocol.Py4JJavaError: An erroroccurred while 
calling o44.collect.: org.apache.spark.SparkException: Jobaborted due to stage 
failure: Task 0 in stage 1.0 failed 4 times, most recentfailure: Lost task 0.3 
in stage 1.0 (TID 4, 
stluhdpddev27.monsanto.com):org.apache.spark.api.python.PythonException: 
Traceback (most recent call last):  File 
"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py",line
 101, in main    process()  
File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py",line
 96, in process   serializer.dump_stream(func(split_index, iterator), outfile)  
File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/serializers.py",line
 236, in dump_stream    vs =list(itertools.islice(iterator, batch))  
File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py",line
 1807, in <lambda>  File"/usr/lib64/python2.6/json/__init__.py", line 307, in 
loads    return_default_decoder.decode(s)  
File"/usr/lib64/python2.6/json/decoder.py", line 319, in decode    obj, end 
=self.raw_decode(s, idx=_w(s, 0).end())  File 
"/usr/lib64/python2.6/json/decoder.py",line 338, in raw_decode    
raiseValueError("No JSON object could be decoded")ValueError: No JSON object 
could bedecoded        at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)       
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:176)    
   at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)       
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)       at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:244)       at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)       at 
org.apache.spark.scheduler.Task.run(Task.scala:64)       at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
      
atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
       at java.lang.Thread.run(Thread.java:745) Driver stacktrace:       at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
       
atorg.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
       
atorg.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
       
atscala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)    
   at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)     
  
atorg.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
       at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
       at scala.Option.foreach(Option.scala:236)       
atorg.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
       at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
       
atorg.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Any 
suggestions would be most welcome. 
****************************************************************Below is the 
code we’re running. We did add a few things tothe original example in our 
attempts to get it working.  from __future__ import print_function import 
sysimport json from pyspark import SparkContextfrom pyspark.conf import 
SparkConf import os.pathos.environ["SPARK_HOME"] 
="/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/"conf = 
(SparkConf().setMaster('local').setAppName('a')) if __name__ == "__main__":   
if len(sys.argv) != 3:      print("""       Usage:hbase_inputformat <host> 
<table>       Run with example jar:       ./bin/spark-submit--driver-class-path 
/path/to/example/jar \      /path/to/examples/hbase_inputformat.py <host> 
<table> [<znode>]       Assumes you have somedata in HBase already, running on 
<host>, in <table>         optionally,you can specify parent znode for your 
hbase cluster - <znode>       """,file=sys.stderr)       exit(-1) host = 
sys.argv[1]table = sys.argv[2]sc = SparkContext(appName="HBaseInputFormat") # 
Other options for configuring scan behavior are available.More information 
available at# 
https://github.com/apache/hbase/blob/master/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.javaconf
 = {"hbase.zookeeper.quorum": host,"hbase.mapreduce.inputtable": table}if 
len(sys.argv) > 3:    conf ={"hbase.zookeeper.quorum": host, 
"zookeeper.znode.parent":sys.argv[3],           "hbase.mapreduce.inputtable": 
table}keyConv 
="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"valueConv
 = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter" 
hbase_rdd = sc.newAPIHadoopRDD(   
"org.apache.hadoop.hbase.mapreduce.TableInputFormat",   
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",   
"org.apache.hadoop.hbase.client.Result",    keyConverter=keyConv,    
valueConverter=valueConv,    conf=conf)hbase_rdd = 
hbase_rdd.flatMapValues(lambda v:v.split("\n")).mapValues(json.loads)# 
hbase_rdd = hbase_rdd.flatMapValues(lambda v:v.split("\n"))# hbase_rdd = 
hbase_rdd.flatMapValues(lambda v:v.split("\n")).countByValue().items() output = 
hbase_rdd.collect()# output = hbase_rddfor (k, v) in output:    print((k, v)) 
sc.stop()



  

Reply via email to