I'm doing a mapPartitions on a rdd cached in memory followed by a reduce. Here is my code snippet
// myRdd is an rdd consisting of Tuple2[Int,Long] myRdd.mapPartitions(rangify).reduce( (x,y) => (x._1+y._1,x._2 ++ y._2)) //The rangify function def rangify(l: Iterator[ Tuple2[Int,Long] ]) : Iterator[ Tuple2[Long, List [ ArrayBuffer[ Tuple2[Long,Long] ] ] ] ]= { var sum=0L val mylist=ArrayBuffer[ Tuple2[Long,Long] ]() if(l.isEmpty) return List( (0L,List [ ArrayBuffer[ Tuple2[Long,Long] ] ] ())).toIterator var prev= -1000L var begin= -1000L for (x <- l){ sum+=x._1 if(prev<0){ prev=x._2 begin=x._2 } else if(x._2==prev+1) prev=x._2 else { list+=((begin,prev)) prev=x._2 begin=x._2 } } mylist+= ((begin,prev)) List((sum, List(mylist) ) ).toIterator } The rdd is cached in memory. I'm using 20 executors with 1 core for each executor. The cached rdd has 60 blocks. The problem is for every 2-3 runs of the job, there is a task which has an abnormally large deserialisation time. Screenshot attached Thank you, Abhishek
--------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org