Well, I kind of got it... this works below:

*****************
val rdd = sc.newAPIHadoopFile(path, classOf[AvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]], classOf[NullWritable]).map(_._1.datum)

rdd
  .map(item => {
    val item = i.copy()
    val record = i._1.datum()

    println(record.get("myValue"))
  })
  .take(10)
*****************

Seems strange to me that I have to iterate over the RDD effectively two
times -- one to create the RDD, and another to perform my action. It also
seems strange that I can't actually access the data in my RDD until I've
copied the records. I would think this is a *very* common use case of an
RDD -- accessing the data it contains (otherwise, what's the point?).

Is there a way to always enable cloning? There used to be a cloneRecords
parameter on the hadoopfile method, but that seems to have been removed.

Finally, if I add rdd.persist(), then it doesn't work. I guess I would need
to do .map(_._1.datum) again before the map that does the real work.


--
Chris Miller

On Sat, Mar 12, 2016 at 4:15 PM, Chris Miller <cmiller11...@gmail.com>
wrote:

> Wow! That sure is buried in the documentation! But yeah, that's what I
> thought more or less.
>
> I tried copying as follows, but that didn't work.
>
> *****************
> val copyRDD = singleFileRDD.map(_.copy())
> *****************
>
> When I iterate over the new copyRDD (foreach or map), I still have the
> same problem of duplicate records. I also tried copying within the block
> where I'm using it, but that didn't work either:
>
> *****************
> rdd
>   .take(10)
>   .collect()
>   .map(item => {
>     val item = i.copy()
>     val record = i._1.datum()
>
>     println(record.get("myValue"))
>   })
> *****************
>
> What am I doing wrong?
>
> --
> Chris Miller
>
> On Sat, Mar 12, 2016 at 1:48 PM, Peyman Mohajerian <mohaj...@gmail.com>
> wrote:
>
>> Here is the reason for the behavior:
>> '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable
>> object for each record, directly caching the returned RDD or directly
>> passing it to an aggregation or shuffle operation will create many
>> references to the same object. If you plan to directly cache, sort, or
>> aggregate Hadoop writable objects, you should first copy them using a map
>>  function.
>>
>>
>> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/SparkContext.html
>>
>> So it is Hadoop related.
>>
>> On Fri, Mar 11, 2016 at 3:19 PM, Chris Miller <cmiller11...@gmail.com>
>> wrote:
>>
>>> I have a bit of a strange situation:
>>>
>>> *****************
>>> import org.apache.avro.generic.{GenericData, GenericRecord}
>>> import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey}
>>> import org.apache.avro.mapreduce.AvroKeyInputFormat
>>> import org.apache.hadoop.io.{NullWritable, WritableUtils}
>>>
>>> val path = "/path/to/data.avro"
>>>
>>> val rdd = sc.newAPIHadoopFile(path,
>>> classOf[AvroKeyInputFormat[GenericRecord]],
>>> classOf[AvroKey[GenericRecord]], classOf[NullWritable])
>>> rdd.take(10).foreach( x => println( x._1.datum() ))
>>> *****************
>>>
>>> In this situation, I get the right number of records returned, and if I
>>> look at the contents of rdd I see the individual records as tuple2's...
>>> however, if I println on each one as shown above, I get the same result
>>> every time.
>>>
>>> Apparently this has to do with something in Spark or Avro keeping a
>>> reference to the item its iterating over, so I need to clone the object
>>> before I use it. However, if I try to clone it (from the spark-shell
>>> console), I get:
>>>
>>> *****************
>>> rdd.take(10).foreach( x => {
>>>   val clonedDatum = x._1.datum().clone()
>>>   println(clonedDatum.datum())
>>> })
>>>
>>> <console>:37: error: method clone in class Object cannot be accessed in
>>> org.apache.avro.generic.GenericRecord
>>>  Access to protected method clone not permitted because
>>>  prefix type org.apache.avro.generic.GenericRecord does not conform to
>>>  class $iwC where the access take place
>>>                 val clonedDatum = x._1.datum().clone()
>>> *****************
>>>
>>> So, how can I clone the datum?
>>>
>>> Seems I'm not the only one who ran into this problem:
>>> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I
>>> can't figure out how to fix it in my case without hacking away like the
>>> person in the linked PR did.
>>>
>>> Suggestions?
>>>
>>> --
>>> Chris Miller
>>>
>>
>>
>

Reply via email to