As I understand it, it's down to how Hadoop FileInputFormats work, and questions of mutability. If you were to read a file from Hadoop via an InputFormat with a simple Java program, the InputFormat's RecordReader creates a single, mutable instance of the Writable key class and a single, mutable instance of the Writable value. When you loop through the records, the RecordReader reuses those Writable instances by deserializing the underlying bytes from the file into the instances 1 record at a time. It's up to the application to then copy whatever's needed out of those Writable instances into something else if it wants to do something with them.
It's exactly the same when using Spark as the application. When you create an RDD of Writable objects by calling .sequenceFile, the RDD contains many identical references to the exact same object instance. Therefore, when Spark does a sort, cache or shuffle, (I believe) it optimizes because it assumes that objects are immutable. Therefore, the map step is necessary, because it creates a distinct, immutable copy of each record. This is just an issue with the Hadoop InputFormat class. If you can write a way of reading files from HDFS that don't use Hadoop's classes (though I'm not sure why you would, a simple map is far easier), then the map would potentially be unnecessary. Andrew From: jeff saremi [mailto:jeffsar...@hotmail.com] Sent: 19 November 2015 05:35 To: Jeff Zhang <zjf...@gmail.com> Cc: dev@spark.apache.org Subject: RE: SequenceFile and object reuse You're not seeing the issue because you perform one additional "map". map{case (k,v) => (k.get(), v.toString)} Instead of being able to use the read Text you had to create a tuple (single) out of the string of the text. That is exactly why I asked this question. Why do we have t do this additional processing? What is the rationale behind it? Is there other ways of reading a hadoop file (or any other file) that would not incur this additional step? thanks _____ Date: Thu, 19 Nov 2015 13:26:31 +0800 Subject: Re: FW: SequenceFile and object reuse From: zjf...@gmail.com <mailto:zjf...@gmail.com> To: jeffsar...@hotmail.com <mailto:jeffsar...@hotmail.com> CC: dev@spark.apache.org <mailto:dev@spark.apache.org> Would this be an issue on the raw data ? I use the following simple code, and don't hit the issue you mentioned. Or it would be better to share your code. val rdd =sc.sequenceFile("/Users/hadoop/Temp/Seq", classOf[IntWritable], classOf[Text]) rdd.map{case (k,v) => (k.get(), v.toString)}.collect() foreach println On Thu, Nov 19, 2015 at 12:04 PM, jeff saremi <jeffsar...@hotmail.com <mailto:jeffsar...@hotmail.com> > wrote: I sent this to the user forum. I got no responses. Could someone here please help? thanks jeff _____ From: jeffsar...@hotmail.com <mailto:jeffsar...@hotmail.com> To: u...@spark.apache.org <mailto:u...@spark.apache.org> Subject: SequenceFile and object reuse Date: Fri, 13 Nov 2015 13:29:58 -0500 So we tried reading a sequencefile in Spark and realized that all our records have ended up becoming the same. THen one of us found this: Note: Because Hadoop's RecordReader class re-uses the same Writable object for each record, directly caching the returned RDD or directly passing it to an aggregation or shuffle operation will create many references to the same object. If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first copy them using a map function. Is there anyone that can shed some light on this bizzare behavior and the decisions behind it? And I also would like to know if anyone's able to read a binary file and not to incur the additional map() as suggested by the above? What format did you use? thanks Jeff -- Best Regards Jeff Zhang
smime.p7s
Description: S/MIME cryptographic signature