How to retreive the value from sql.row by column name
Is it possible to reference a column from a SchemaRDD using the column's name instead of its number? For example, let's say I've created a SchemaRDD from an avro file: val sqlContext = new SQLContext(sc) import sqlContext._ val caper=sqlContext.avroFile("hdfs://localhost:9000/sma/raw_avro/caper") caper.registerTempTable("caper") scala> caper res20: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:108 == Query Plan == == Physical Plan == PhysicalRDD [ADMDISP#0,age#1,AMBSURG#2,apptdt_skew#3,APPTSTAT#4,APPTTYPE#5,ASSGNDUR#6,CANCSTAT#7,CAPERSTAT#8,COMPLAINT#9,CPT_1#10,CPT_10#11,CPT_11#12,CPT_12#13,CPT_13#14,CPT_2#15,CPT_3#16,CPT_4#17,CPT_5#18,CPT_6#19,CPT_7#20,CPT_8#21,CPT_9#22,CPTDX_1#23,CPTDX_10#24,CPTDX_11#25,CPTDX_12#26,CPTDX_13#27,CPTDX_2#28,CPTDX_3#29,CPTDX_4#30,CPTDX_5#31,CPTDX_6#32,CPTDX_7#33,CPTDX_8#34,CPTDX_9#35,CPTMOD1_1#36,CPTMOD1_10#37,CPTMOD1_11#38,CPTMOD1_12#39,CPTMOD1_13#40,CPTMOD1_2#41,CPTMOD1_3#42,CPTMOD1_4#43,CPTMOD1_5#44,CPTMOD1_6#45,CPTMOD1_7#46,CPTMOD1_8#47,CPTMOD1_9#48,CPTMOD2_1#49,CPTMOD2_10#50,CPTMOD2_11#51,CPTMOD2_12#52,CPTMOD2_13#53,CPTMOD2_2#54,CPTMOD2_3#55,CPTMOD2_4#56,CPTMOD... scala> Now I want to access fields, and of course the normal thing to do is to use a field name, not a field number. scala> val kv = caper.map(r => (r.ran_id, r)) :23: error: value ran_id is not a member of org.apache.spark.sql.Row val kv = caper.map(r => (r.ran_id, r)) How do I do this? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to retreive the value from sql.row by column name
I am just learning scala so I don't actually understand what your code snippet is doing but thank you, I will learn more so I can figure it out. I am new to all of this and still trying to make the mental shift from normal programming to distributed programming, but it seems to me that the row object would know its own schema object that it came from and be able to ask its schema to transform a name to a column number. Am I missing something or is this just a matter of time constraints and this one just hasn't gotten into the queue yet? Baring that, do the schema classes provide methods for doing this? I've looked and didn't see anything. I've just discovered that the python implementation for SchemaRDD does in fact allow for referencing by name and column. Why is this provided in the python implementation but not scala or java implementations? Thanks, --eric On 02/16/2015 10:46 AM, Michael Armbrust wrote: For efficiency the row objects don't contain the schema so you can't get the column by name directly. I usually do a select followed by pattern matching. Something like the following: caper.select('ran_id).map { case Row(ranId: String) => } On Mon, Feb 16, 2015 at 8:54 AM, Eric Bell <mailto:e...@ericjbell.com>> wrote: Is it possible to reference a column from a SchemaRDD using the column's name instead of its number? For example, let's say I've created a SchemaRDD from an avro file: val sqlContext = new SQLContext(sc) import sqlContext._ val caper=sqlContext.avroFile("hdfs://localhost:9000/sma/raw_avro/caper") caper.registerTempTable("caper") scala> caper res20: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:108 == Query Plan == == Physical Plan == PhysicalRDD [ADMDISP#0,age#1,AMBSURG#2,apptdt_skew#3,APPTSTAT#4,APPTTYPE#5,ASSGNDUR#6,CANCSTAT#7,CAPERSTAT#8,COMPLAINT#9,CPT_1#10,CPT_10#11,CPT_11#12,CPT_12#13,CPT_13#14,CPT_2#15,CPT_3#16,CPT_4#17,CPT_5#18,CPT_6#19,CPT_7#20,CPT_8#21,CPT_9#22,CPTDX_1#23,CPTDX_10#24,CPTDX_11#25,CPTDX_12#26,CPTDX_13#27,CPTDX_2#28,CPTDX_3#29,CPTDX_4#30,CPTDX_5#31,CPTDX_6#32,CPTDX_7#33,CPTDX_8#34,CPTDX_9#35,CPTMOD1_1#36,CPTMOD1_10#37,CPTMOD1_11#38,CPTMOD1_12#39,CPTMOD1_13#40,CPTMOD1_2#41,CPTMOD1_3#42,CPTMOD1_4#43,CPTMOD1_5#44,CPTMOD1_6#45,CPTMOD1_7#46,CPTMOD1_8#47,CPTMOD1_9#48,CPTMOD2_1#49,CPTMOD2_10#50,CPTMOD2_11#51,CPTMOD2_12#52,CPTMOD2_13#53,CPTMOD2_2#54,CPTMOD2_3#55,CPTMOD2_4#56,CPTMOD... scala> Now I want to access fields, and of course the normal thing to do is to use a field name, not a field number. scala> val kv = caper.map(r => (r.ran_id, r)) :23: error: value ran_id is not a member of org.apache.spark.sql.Row val kv = caper.map(r => (r.ran_id, r)) How do I do this? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org <mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org <mailto:user-h...@spark.apache.org>
Spark newbie desires feedback on first program
I'm a spark newbie working on his first attempt to do write an ETL program. I could use some feedback to make sure I'm on the right path. I've written a basic proof of concept that runs without errors and seems to work, although I might be missing some issues when this is actually run on more than a single node. I am working with data about people (actually healthcare patients). I have an RDD that contains multiple rows per person. My overall goal is to create a single Person object for each person in my data. In this example, I am serializing to JSON, mostly because this is what I know how to do at the moment. Other than general feedback, is my use of the groupByKey() and mapValues() methods appropriate? Thanks! import json class Person: def __init__(self): self.mydata={} self.cpts = [] self.mydata['cpt']=self.cpts def addRowData(self, dataRow): # Get the CPT codes cpt = dataRow.CPT_1 if cpt: self.cpts.append(cpt) def serializeToJSON(self): return json.dumps(self.mydata) def makeAPerson(rows): person = Person() for row in rows: person.addRowData(row) return person.serializeToJSON() peopleRDD = caper_kv.groupByKey().mapValues(lambda personDataRows: makeAPerson(personDataRows)) peopleRDD.saveAsTextFile("hdfs://localhost:9000/sma/processJSON/people") - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark newbie desires feedback on first program
Thanks Charles. I just realized a few minutes ago that I neglected to show the step where I generated the key on the person ID. Thanks for the pointer on the HDFS URL. Next step is to process data from multiple RDDS. My data originates from 7 tables in a MySQL database. I used sqoop to create avro files from these tables, and in turn created RDDs using SparkSQL from the avro files. Since the groupByKey only operates on a single RDD, I'm not quite sure yet how I'm going to process 7 tables as a transformation to get all the data I need into my objects. I'm vascillating on whether I should be doing it this way, or if it would be a lot simpler to query MySQL to get all the Person IDs, parallelize them, and have my Person class make queries directly to the MySQL database. Since in theory I only have to do this once, I'm not sure there's much to be gained in moving the data from MySQL to Spark first. I have yet to find any non-trivial examples of ETL logic on the web ... it seems like it's mostly word count map-reduce replacements. On 02/16/2015 01:32 PM, Charles Feduke wrote: I cannot comment about the correctness of Python code. I will assume your caper_kv is keyed on something that uniquely identifies all the rows that make up the person's record so your group by key makes sense, as does the map. (I will also assume all of the rows that comprise a single person's record will always fit in memory. If not you will need another approach.) You should be able to get away with removing the "localhost:9000" from your HDFS URL, i.e., "hdfs:///sma/processJSON/people" and let your HDFS configuration for Spark supply the missing pieces. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org