Re: .cache() changes contents of RDD

2016-02-27 Thread Sabarish Sasidharan
This is because Hadoop writables are being reused. Just map it to some custom type and then do further operations including cache() on it. Regards Sab On 27-Feb-2016 9:11 am, "Yan Yang" wrote: > Hi > > I am pretty new to Spark, and after experimentation on our pipelines. I > ran into this weird

Re: .cache() changes contents of RDD

2016-02-27 Thread Igor Berman
are you using avro format by any chance? there is some formats that need to be "deep"-copy before caching or aggregating try something like val input = sc.newAPIHadoopRDD(...) val rdd = input.map(deepCopyTransformation).map(...) rdd.cache() rdd.saveAsTextFile(...) where deepCopyTransformation is f

Re: .cache() changes contents of RDD

2016-02-26 Thread Ted Yu
Can you reveal what is done inside the map() ? Which Spark release are you using ? Cheers On Fri, Feb 26, 2016 at 7:41 PM, Yan Yang wrote: > Hi > > I am pretty new to Spark, and after experimentation on our pipelines. I > ran into this weird issue. > > The Scala code is as below: > > val input

.cache() changes contents of RDD

2016-02-26 Thread Yan Yang
Hi I am pretty new to Spark, and after experimentation on our pipelines. I ran into this weird issue. The Scala code is as below: val input = sc.newAPIHadoopRDD(...) val rdd = input.map(...) rdd.cache() rdd.saveAsTextFile(...) I found rdd to consist of 80+K identical rows. To be more precise, t