Well, I kind of got it... this works below: ***************** val rdd = sc.newAPIHadoopFile(path, classOf[AvroKeyInputFormat[GenericRecord]], classOf[AvroKey[GenericRecord]], classOf[NullWritable]).map(_._1.datum)
rdd .map(item => { val item = i.copy() val record = i._1.datum() println(record.get("myValue")) }) .take(10) ***************** Seems strange to me that I have to iterate over the RDD effectively two times -- one to create the RDD, and another to perform my action. It also seems strange that I can't actually access the data in my RDD until I've copied the records. I would think this is a *very* common use case of an RDD -- accessing the data it contains (otherwise, what's the point?). Is there a way to always enable cloning? There used to be a cloneRecords parameter on the hadoopfile method, but that seems to have been removed. Finally, if I add rdd.persist(), then it doesn't work. I guess I would need to do .map(_._1.datum) again before the map that does the real work. -- Chris Miller On Sat, Mar 12, 2016 at 4:15 PM, Chris Miller <cmiller11...@gmail.com> wrote: > Wow! That sure is buried in the documentation! But yeah, that's what I > thought more or less. > > I tried copying as follows, but that didn't work. > > ***************** > val copyRDD = singleFileRDD.map(_.copy()) > ***************** > > When I iterate over the new copyRDD (foreach or map), I still have the > same problem of duplicate records. I also tried copying within the block > where I'm using it, but that didn't work either: > > ***************** > rdd > .take(10) > .collect() > .map(item => { > val item = i.copy() > val record = i._1.datum() > > println(record.get("myValue")) > }) > ***************** > > What am I doing wrong? > > -- > Chris Miller > > On Sat, Mar 12, 2016 at 1:48 PM, Peyman Mohajerian <mohaj...@gmail.com> > wrote: > >> Here is the reason for the behavior: >> '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable >> object for each record, directly caching the returned RDD or directly >> passing it to an aggregation or shuffle operation will create many >> references to the same object. If you plan to directly cache, sort, or >> aggregate Hadoop writable objects, you should first copy them using a map >> function. >> >> >> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/SparkContext.html >> >> So it is Hadoop related. >> >> On Fri, Mar 11, 2016 at 3:19 PM, Chris Miller <cmiller11...@gmail.com> >> wrote: >> >>> I have a bit of a strange situation: >>> >>> ***************** >>> import org.apache.avro.generic.{GenericData, GenericRecord} >>> import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey} >>> import org.apache.avro.mapreduce.AvroKeyInputFormat >>> import org.apache.hadoop.io.{NullWritable, WritableUtils} >>> >>> val path = "/path/to/data.avro" >>> >>> val rdd = sc.newAPIHadoopFile(path, >>> classOf[AvroKeyInputFormat[GenericRecord]], >>> classOf[AvroKey[GenericRecord]], classOf[NullWritable]) >>> rdd.take(10).foreach( x => println( x._1.datum() )) >>> ***************** >>> >>> In this situation, I get the right number of records returned, and if I >>> look at the contents of rdd I see the individual records as tuple2's... >>> however, if I println on each one as shown above, I get the same result >>> every time. >>> >>> Apparently this has to do with something in Spark or Avro keeping a >>> reference to the item its iterating over, so I need to clone the object >>> before I use it. However, if I try to clone it (from the spark-shell >>> console), I get: >>> >>> ***************** >>> rdd.take(10).foreach( x => { >>> val clonedDatum = x._1.datum().clone() >>> println(clonedDatum.datum()) >>> }) >>> >>> <console>:37: error: method clone in class Object cannot be accessed in >>> org.apache.avro.generic.GenericRecord >>> Access to protected method clone not permitted because >>> prefix type org.apache.avro.generic.GenericRecord does not conform to >>> class $iwC where the access take place >>> val clonedDatum = x._1.datum().clone() >>> ***************** >>> >>> So, how can I clone the datum? >>> >>> Seems I'm not the only one who ran into this problem: >>> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I >>> can't figure out how to fix it in my case without hacking away like the >>> person in the linked PR did. >>> >>> Suggestions? >>> >>> -- >>> Chris Miller >>> >> >> >