Hi, Is there any recommended way of serializing Hadoop Writables' in Spark? Here is my problem.
Question1: I have a pair RDD which is created by reading a SEQ[LongWritable, BytesWritable]: RDD[(LongWritable, BytesWritable)] I have these two settings set in spark conf. spark.serializer=org.apache.spark.serializer.KryoSerializer spark.kryo.registrator=MyCustomRegistrator Inside the MyCustomRegistrator, I registered both LongWritable and BytesWritable classes. kryo.register(classOf[LongWritable]) kryo.register(classOf[BytesWritable]) The total size of the SEQ[LongWritable, BytesWritable] that I read to create the RDD[(LongWritable, BytesWritable)] is *800MB*. I have 10 executors in my job with 10GB of memory. I am performing reduceByKey operation on this RDD and I see very huge Shuffle writes of 10GB on each executor which doesn't make any sense. Also the reduceByKey stage is very very slow and sometimes executors throw OOM exception. Can someone explain this shuffle behavior in Spark? Why does Spark show 100GB of shuffle writes for 800MB if input data? Also when I convert RDD[(LongWritable,BytesWritable)] to RDD[Long, CustomObject] , the reduceByKey operation takes only 30 seconds to finish and is very fast. Question2: Now for the same job this time I wrote custom serializers for LongWritable and BytesWritable. Here is the code. import com.esotericsoftware.kryo.{Kryo, Serializer} import com.esotericsoftware.kryo.io.{Input, Output} import org.apache.hadoop.io.{BytesWritable, LongWritable} /** * Kryo Custom Serializer for serializing LongWritable */ class LongWritableSerializer extends Serializer[LongWritable] { override def write(kryo: Kryo, output: Output, obj: LongWritable): Unit = { output.writeLong(obj.get()) } override def read(kryo: Kryo, input: Input, clazz: Class[LongWritable]): LongWritable = { val longVal = input.readLong() new LongWritable(longVal) } } /** * Kryo Custom Serializer for serializing BytesWritable */ class BytesWritableSerializer extends Serializer[BytesWritable] { override def write(kryo: Kryo, output: Output, obj: BytesWritable): Unit = { val bytes = obj.getBytes output.writeInt(bytes.size) output.writeBytes(bytes) } override def read(kryo: Kryo, input: Input, clazz: Class[BytesWritable]): BytesWritable = { val length = input.readInt() val bytes = input.readBytes(length) new BytesWritable(bytes) } } And then I registered these with Kryo inside MyCustomRegistrator. kryo.register(classOf[LongWritable], new LongWritableSerializer()) kryo.register(classOf[BytesWritable], new BytesWritableSerializer()) I still see the same behavior. Can someone also check this? Thanks, Pradeep -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org