Thanks Ted

After searching for a whole day, I still don't know how to let spark use
twitter chill serialization - there are very few documents about how to
integrate twitter chill into Spark for serialization. I tried the
following, but an exception of "java.lang.ClassCastException:
com.twitter.chill.WrappedArraySerializer cannot be cast to
org.apache.spark.serializer.Serializer" was thrown:

        val conf = new SparkConf()
                   .setAppName("Test Serialization")
                   .set("spark.serializer",
"com.twitter.chill.WrappedArraySerializer")


Well, what is the correct way of configuring Spark to use the twitter chill
serialization framework ?







2015-02-15 22:23 GMT+08:00 Ted Yu <yuzhih...@gmail.com>:

> I was looking at https://github.com/twitter/chill
>
> It seems this would achieve what you want:
> chill-scala/src/main/scala/com/twitter/chill/WrappedArraySerializer.scala
>
> Cheers
>
> On Sat, Feb 14, 2015 at 6:36 PM, Tao Xiao <xiaotao.cs....@gmail.com>
> wrote:
>
>> I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be
>> serialized by Kryo but *Array[ImmutableBytesWritable] *can't be
>> serialized even when I registered both of them in Kryo.
>>
>> The code is as follows:
>>
>>        val conf = new SparkConf()
>>                 .setAppName("Hello Spark")
>>                 .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>>                 .set("spark.kryo.registrator", "xt.MyKryoRegistrator")
>>
>>         val sc = new SparkContext(conf)
>>
>>         val rdd = sc.parallelize(List(
>>                     (new ImmutableBytesWritable(Bytes.toBytes("AAA")),
>> new KeyValue()),
>>                     (new ImmutableBytesWritable(Bytes.toBytes("BBB")),
>> new KeyValue()),
>>                     (new ImmutableBytesWritable(Bytes.toBytes("CCC")),
>> new KeyValue()),
>>                     (new ImmutableBytesWritable(Bytes.toBytes("DDD")),
>> new KeyValue())), 4)
>>
>>         // snippet 1:  a single object of *ImmutableBytesWritable* can
>> be serialized in broadcast
>>         val partitioner = new SingleElementPartitioner(sc.broadcast(new
>> ImmutableBytesWritable(Bytes.toBytes(3))))
>>         val ret = rdd.aggregateByKey(List[KeyValue](),
>> partitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
>>  (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys ).persist()
>>         println("\n\n\ret.count = " + ret.count + ",  partition size = "
>> + ret.partitions.size)
>>
>>         // snippet 2: an array of *ImmutableBytesWritable* can not be
>> serialized in broadcast
>>         val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new
>> ImmutableBytesWritable(Bytes.toBytes(2)), new
>> ImmutableBytesWritable(Bytes.toBytes(3)))
>>         val newPartitioner = new ArrayPartitioner(sc.broadcast(arr))
>>         val ret1 = rdd.aggregateByKey(List[KeyValue](),
>> newPartitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
>>  (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys )
>>         println("\n\n\nrdd2.count = " + ret1.count)
>>
>>         sc.stop
>>
>>
>>       // the following are kryo registrator and partitioners
>>        class MyKryoRegistrator extends KryoRegistrator {
>>             override def registerClasses(kryo: Kryo): Unit = {
>>                  kryo.register(classOf[ImmutableBytesWritable])   //
>> register ImmutableBytesWritable
>>                  kryo.register(classOf[Array[ImmutableBytesWritable]])
>>  // register Array[ImmutableBytesWritable]
>>             }
>>        }
>>
>>        class SingleElementPartitioner(bc:
>> Broadcast[ImmutableBytesWritable]) extends Partitioner {
>>             override def numPartitions: Int = 5
>>             def v = Bytes.toInt(bc.value.get)
>>             override def getPartition(key: Any): Int =  v - 1
>>        }
>>
>>
>>         class ArrayPartitioner(bc:
>> Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner {
>>             val arr = bc.value
>>             override def numPartitions: Int = arr.length
>>             override def getPartition(key: Any): Int =
>> Bytes.toInt(arr(0).get)
>>         }
>>
>>
>>
>> In the code above, snippet 1 can work as expected. But snippet 2 throws
>> "Task not serializable: java.io.NotSerializableException:
>> org.apache.hadoop.hbase.io.ImmutableBytesWritable"  .
>>
>>
>> So do I have to implement a Kryo serializer for Array[T] if it is used in
>> broadcast ?
>>
>> Thanks
>>
>>
>>
>>
>>
>

Reply via email to