Hi,

Is there any recommended way of serializing Hadoop Writables' in Spark? 
Here is my problem.

Question1:
I have a pair RDD which is created by reading a SEQ[LongWritable,
BytesWritable]:
RDD[(LongWritable, BytesWritable)]

I have these two settings set in spark conf.
spark.serializer=org.apache.spark.serializer.KryoSerializer 
spark.kryo.registrator=MyCustomRegistrator

Inside the MyCustomRegistrator, I registered both LongWritable and
BytesWritable classes.
kryo.register(classOf[LongWritable])
kryo.register(classOf[BytesWritable])

The total size of the SEQ[LongWritable, BytesWritable] that I read to create
the RDD[(LongWritable, BytesWritable)] is *800MB*. I have 10 executors in my
job with 10GB of memory. I am performing reduceByKey operation on this RDD
and I see very huge Shuffle writes of 10GB on each executor which doesn't
make any sense. Also the reduceByKey stage is very very slow and sometimes
executors throw OOM exception.

Can someone explain this shuffle behavior in Spark? Why does Spark show
100GB of shuffle writes for 800MB if input data? 
Also when I convert RDD[(LongWritable,BytesWritable)] to RDD[Long,
CustomObject] , the reduceByKey operation takes only 30 seconds to finish
and is very fast.

Question2:
Now for the same job this time I wrote custom serializers for LongWritable
and BytesWritable. Here is the code.


import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.esotericsoftware.kryo.io.{Input, Output}
import org.apache.hadoop.io.{BytesWritable, LongWritable}
/**
  * Kryo Custom Serializer for serializing LongWritable
  */
class LongWritableSerializer extends Serializer[LongWritable] {
  override def write(kryo: Kryo, output: Output, obj: LongWritable): Unit =
{
    output.writeLong(obj.get())
  }
  override def read(kryo: Kryo,
                    input: Input,
                    clazz: Class[LongWritable]): LongWritable = {
    val longVal = input.readLong()
    new LongWritable(longVal)
  }
}
/**
  * Kryo Custom Serializer for serializing BytesWritable
  */
class BytesWritableSerializer extends Serializer[BytesWritable] {
  override def write(kryo: Kryo, output: Output, obj: BytesWritable): Unit =
{
    val bytes = obj.getBytes
    output.writeInt(bytes.size)
    output.writeBytes(bytes)
  }
  override def read(kryo: Kryo,
                    input: Input,
                    clazz: Class[BytesWritable]): BytesWritable = {
    val length = input.readInt()
    val bytes = input.readBytes(length)
    new BytesWritable(bytes)
  }
}



And then I registered these with Kryo inside MyCustomRegistrator.
kryo.register(classOf[LongWritable], new LongWritableSerializer())
kryo.register(classOf[BytesWritable], new BytesWritableSerializer())

I still see the same behavior. Can someone also check this? 


Thanks,
Pradeep







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to