Thanks Ted After searching for a whole day, I still don't know how to let spark use twitter chill serialization - there are very few documents about how to integrate twitter chill into Spark for serialization. I tried the following, but an exception of "java.lang.ClassCastException: com.twitter.chill.WrappedArraySerializer cannot be cast to org.apache.spark.serializer.Serializer" was thrown:
val conf = new SparkConf() .setAppName("Test Serialization") .set("spark.serializer", "com.twitter.chill.WrappedArraySerializer") Well, what is the correct way of configuring Spark to use the twitter chill serialization framework ? 2015-02-15 22:23 GMT+08:00 Ted Yu <yuzhih...@gmail.com>: > I was looking at https://github.com/twitter/chill > > It seems this would achieve what you want: > chill-scala/src/main/scala/com/twitter/chill/WrappedArraySerializer.scala > > Cheers > > On Sat, Feb 14, 2015 at 6:36 PM, Tao Xiao <xiaotao.cs....@gmail.com> > wrote: > >> I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be >> serialized by Kryo but *Array[ImmutableBytesWritable] *can't be >> serialized even when I registered both of them in Kryo. >> >> The code is as follows: >> >> val conf = new SparkConf() >> .setAppName("Hello Spark") >> .set("spark.serializer", >> "org.apache.spark.serializer.KryoSerializer") >> .set("spark.kryo.registrator", "xt.MyKryoRegistrator") >> >> val sc = new SparkContext(conf) >> >> val rdd = sc.parallelize(List( >> (new ImmutableBytesWritable(Bytes.toBytes("AAA")), >> new KeyValue()), >> (new ImmutableBytesWritable(Bytes.toBytes("BBB")), >> new KeyValue()), >> (new ImmutableBytesWritable(Bytes.toBytes("CCC")), >> new KeyValue()), >> (new ImmutableBytesWritable(Bytes.toBytes("DDD")), >> new KeyValue())), 4) >> >> // snippet 1: a single object of *ImmutableBytesWritable* can >> be serialized in broadcast >> val partitioner = new SingleElementPartitioner(sc.broadcast(new >> ImmutableBytesWritable(Bytes.toBytes(3)))) >> val ret = rdd.aggregateByKey(List[KeyValue](), >> partitioner)((xs:List[KeyValue], y:KeyValue) => y::xs, >> (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys ).persist() >> println("\n\n\ret.count = " + ret.count + ", partition size = " >> + ret.partitions.size) >> >> // snippet 2: an array of *ImmutableBytesWritable* can not be >> serialized in broadcast >> val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new >> ImmutableBytesWritable(Bytes.toBytes(2)), new >> ImmutableBytesWritable(Bytes.toBytes(3))) >> val newPartitioner = new ArrayPartitioner(sc.broadcast(arr)) >> val ret1 = rdd.aggregateByKey(List[KeyValue](), >> newPartitioner)((xs:List[KeyValue], y:KeyValue) => y::xs, >> (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys ) >> println("\n\n\nrdd2.count = " + ret1.count) >> >> sc.stop >> >> >> // the following are kryo registrator and partitioners >> class MyKryoRegistrator extends KryoRegistrator { >> override def registerClasses(kryo: Kryo): Unit = { >> kryo.register(classOf[ImmutableBytesWritable]) // >> register ImmutableBytesWritable >> kryo.register(classOf[Array[ImmutableBytesWritable]]) >> // register Array[ImmutableBytesWritable] >> } >> } >> >> class SingleElementPartitioner(bc: >> Broadcast[ImmutableBytesWritable]) extends Partitioner { >> override def numPartitions: Int = 5 >> def v = Bytes.toInt(bc.value.get) >> override def getPartition(key: Any): Int = v - 1 >> } >> >> >> class ArrayPartitioner(bc: >> Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner { >> val arr = bc.value >> override def numPartitions: Int = arr.length >> override def getPartition(key: Any): Int = >> Bytes.toInt(arr(0).get) >> } >> >> >> >> In the code above, snippet 1 can work as expected. But snippet 2 throws >> "Task not serializable: java.io.NotSerializableException: >> org.apache.hadoop.hbase.io.ImmutableBytesWritable" . >> >> >> So do I have to implement a Kryo serializer for Array[T] if it is used in >> broadcast ? >> >> Thanks >> >> >> >> >> >