How to retreive the value from sql.row by column name

2015-02-16 Thread Eric Bell
Is it possible to reference a column from a SchemaRDD using the column's 
name instead of its number?


For example, let's say I've created a SchemaRDD from an avro file:

val sqlContext = new SQLContext(sc)
import sqlContext._
val caper=sqlContext.avroFile("hdfs://localhost:9000/sma/raw_avro/caper")
caper.registerTempTable("caper")

scala> caper
res20: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at 
SchemaRDD.scala:108

== Query Plan ==
== Physical Plan ==
PhysicalRDD 
[ADMDISP#0,age#1,AMBSURG#2,apptdt_skew#3,APPTSTAT#4,APPTTYPE#5,ASSGNDUR#6,CANCSTAT#7,CAPERSTAT#8,COMPLAINT#9,CPT_1#10,CPT_10#11,CPT_11#12,CPT_12#13,CPT_13#14,CPT_2#15,CPT_3#16,CPT_4#17,CPT_5#18,CPT_6#19,CPT_7#20,CPT_8#21,CPT_9#22,CPTDX_1#23,CPTDX_10#24,CPTDX_11#25,CPTDX_12#26,CPTDX_13#27,CPTDX_2#28,CPTDX_3#29,CPTDX_4#30,CPTDX_5#31,CPTDX_6#32,CPTDX_7#33,CPTDX_8#34,CPTDX_9#35,CPTMOD1_1#36,CPTMOD1_10#37,CPTMOD1_11#38,CPTMOD1_12#39,CPTMOD1_13#40,CPTMOD1_2#41,CPTMOD1_3#42,CPTMOD1_4#43,CPTMOD1_5#44,CPTMOD1_6#45,CPTMOD1_7#46,CPTMOD1_8#47,CPTMOD1_9#48,CPTMOD2_1#49,CPTMOD2_10#50,CPTMOD2_11#51,CPTMOD2_12#52,CPTMOD2_13#53,CPTMOD2_2#54,CPTMOD2_3#55,CPTMOD2_4#56,CPTMOD...

scala>

Now I want to access fields, and of course the normal thing to do is to 
use a field name, not a field number.


scala> val kv = caper.map(r => (r.ran_id, r))
:23: error: value ran_id is not a member of 
org.apache.spark.sql.Row

   val kv = caper.map(r => (r.ran_id, r))

How do I do this?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to retreive the value from sql.row by column name

2015-02-16 Thread Eric Bell
I am just learning scala so I don't actually understand what your code 
snippet is doing but thank you, I will learn more so I can figure it out.


I am new to all of this and still trying to make the mental shift from 
normal programming to distributed programming, but it seems to me that 
the row object would know its own schema object that it came from and be 
able to ask its schema to transform a name to a column number. Am I 
missing something or is this just a matter of time constraints and this 
one just hasn't gotten into the queue yet?


Baring that, do the schema classes provide methods for doing this? I've 
looked and didn't see anything.


I've just discovered that the python implementation for SchemaRDD does 
in fact allow for referencing by name and column. Why is this provided 
in the python implementation but not scala or java implementations?


Thanks,

--eric


On 02/16/2015 10:46 AM, Michael Armbrust wrote:
For efficiency the row objects don't contain the schema so you can't 
get the column by name directly.  I usually do a select followed by 
pattern matching. Something like the following:


caper.select('ran_id).map { case Row(ranId: String) => }

On Mon, Feb 16, 2015 at 8:54 AM, Eric Bell <mailto:e...@ericjbell.com>> wrote:


Is it possible to reference a column from a SchemaRDD using the
column's name instead of its number?

For example, let's say I've created a SchemaRDD from an avro file:

val sqlContext = new SQLContext(sc)
import sqlContext._
val
caper=sqlContext.avroFile("hdfs://localhost:9000/sma/raw_avro/caper")
caper.registerTempTable("caper")

scala> caper
res20: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at
SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
PhysicalRDD

[ADMDISP#0,age#1,AMBSURG#2,apptdt_skew#3,APPTSTAT#4,APPTTYPE#5,ASSGNDUR#6,CANCSTAT#7,CAPERSTAT#8,COMPLAINT#9,CPT_1#10,CPT_10#11,CPT_11#12,CPT_12#13,CPT_13#14,CPT_2#15,CPT_3#16,CPT_4#17,CPT_5#18,CPT_6#19,CPT_7#20,CPT_8#21,CPT_9#22,CPTDX_1#23,CPTDX_10#24,CPTDX_11#25,CPTDX_12#26,CPTDX_13#27,CPTDX_2#28,CPTDX_3#29,CPTDX_4#30,CPTDX_5#31,CPTDX_6#32,CPTDX_7#33,CPTDX_8#34,CPTDX_9#35,CPTMOD1_1#36,CPTMOD1_10#37,CPTMOD1_11#38,CPTMOD1_12#39,CPTMOD1_13#40,CPTMOD1_2#41,CPTMOD1_3#42,CPTMOD1_4#43,CPTMOD1_5#44,CPTMOD1_6#45,CPTMOD1_7#46,CPTMOD1_8#47,CPTMOD1_9#48,CPTMOD2_1#49,CPTMOD2_10#50,CPTMOD2_11#51,CPTMOD2_12#52,CPTMOD2_13#53,CPTMOD2_2#54,CPTMOD2_3#55,CPTMOD2_4#56,CPTMOD...
scala>

Now I want to access fields, and of course the normal thing to do
is to use a field name, not a field number.

scala> val kv = caper.map(r => (r.ran_id, r))
:23: error: value ran_id is not a member of
org.apache.spark.sql.Row
   val kv = caper.map(r => (r.ran_id, r))

How do I do this?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: user-h...@spark.apache.org
<mailto:user-h...@spark.apache.org>






Spark newbie desires feedback on first program

2015-02-16 Thread Eric Bell
I'm a spark newbie working on his first attempt to do write an ETL 
program. I could use some feedback to make sure I'm on the right path. 
I've written a basic proof of concept that runs without errors and seems 
to work, although I might be missing some issues when this is actually 
run on more than a single node.


I am working with data about people (actually healthcare patients). I 
have an RDD that contains multiple rows per person. My overall goal is 
to create a single Person object for each person in my data. In this 
example, I am serializing to JSON, mostly because this is what I know 
how to do at the moment.


Other than general feedback, is my use of the groupByKey() and 
mapValues() methods appropriate?


Thanks!


import json

class Person:
def __init__(self):
self.mydata={}
self.cpts = []
self.mydata['cpt']=self.cpts
def addRowData(self, dataRow):
# Get the CPT codes
cpt = dataRow.CPT_1
if cpt:
self.cpts.append(cpt)
def serializeToJSON(self):
return json.dumps(self.mydata)

def makeAPerson(rows):
person = Person()
for row in rows:
person.addRowData(row)
return person.serializeToJSON()

peopleRDD = caper_kv.groupByKey().mapValues(lambda personDataRows: 
makeAPerson(personDataRows))

peopleRDD.saveAsTextFile("hdfs://localhost:9000/sma/processJSON/people")


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark newbie desires feedback on first program

2015-02-16 Thread Eric Bell
Thanks Charles. I just realized a few minutes ago that I neglected to 
show the step where I generated the key on the person ID. Thanks for the 
pointer on the HDFS URL.


Next step is to process data from multiple RDDS. My data originates from 
7 tables in a MySQL database. I used sqoop to create avro files from 
these tables, and in turn created RDDs using SparkSQL from the avro 
files. Since the groupByKey only operates on a single RDD, I'm not quite 
sure yet how I'm going to process 7 tables as a transformation to get 
all the data I need into my objects.


I'm vascillating on whether I should be doing it this way, or if it 
would be a lot simpler to query MySQL to get all the Person IDs, 
parallelize them, and have my Person class make queries directly to the 
MySQL database. Since in theory I only have to do this once, I'm not 
sure there's much to be gained in moving the data from MySQL to Spark first.


I have yet to find any non-trivial examples of ETL logic on the web ... 
it seems like it's mostly word count map-reduce replacements.


On 02/16/2015 01:32 PM, Charles Feduke wrote:
I cannot comment about the correctness of Python code. I will assume 
your caper_kv is keyed on something that uniquely identifies all the 
rows that make up the person's record so your group by key makes 
sense, as does the map. (I will also assume all of the rows that 
comprise a single person's record will always fit in memory. If not 
you will need another approach.)


You should be able to get away with removing the "localhost:9000" from 
your HDFS URL, i.e., "hdfs:///sma/processJSON/people" and let your 
HDFS configuration for Spark supply the missing pieces.





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org