I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be
serialized by Kryo but *Array[ImmutableBytesWritable] *can't be serialized
even when I registered both of them in Kryo.
The code is as follows:
val conf = new SparkConf()
.setAppName("Hello Spark")
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "xt.MyKryoRegistrator")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(List(
(new ImmutableBytesWritable(Bytes.toBytes("AAA")), new
KeyValue()),
(new ImmutableBytesWritable(Bytes.toBytes("BBB")), new
KeyValue()),
(new ImmutableBytesWritable(Bytes.toBytes("CCC")), new
KeyValue()),
(new ImmutableBytesWritable(Bytes.toBytes("DDD")), new
KeyValue())), 4)
// snippet 1: a single object of *ImmutableBytesWritable* can be
serialized in broadcast
val partitioner = new SingleElementPartitioner(sc.broadcast(new
ImmutableBytesWritable(Bytes.toBytes(3))))
val ret = rdd.aggregateByKey(List[KeyValue](),
partitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
(xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys ).persist()
println("\n\n\ret.count = " + ret.count + ", partition size = " +
ret.partitions.size)
// snippet 2: an array of *ImmutableBytesWritable* can not be
serialized in broadcast
val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new
ImmutableBytesWritable(Bytes.toBytes(2)), new
ImmutableBytesWritable(Bytes.toBytes(3)))
val newPartitioner = new ArrayPartitioner(sc.broadcast(arr))
val ret1 = rdd.aggregateByKey(List[KeyValue](),
newPartitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
(xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys )
println("\n\n\nrdd2.count = " + ret1.count)
sc.stop
// the following are kryo registrator and partitioners
class MyKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
kryo.register(classOf[ImmutableBytesWritable]) //
register ImmutableBytesWritable
kryo.register(classOf[Array[ImmutableBytesWritable]])
// register
Array[ImmutableBytesWritable]
}
}
class SingleElementPartitioner(bc:
Broadcast[ImmutableBytesWritable]) extends Partitioner {
override def numPartitions: Int = 5
def v = Bytes.toInt(bc.value.get)
override def getPartition(key: Any): Int = v - 1
}
class ArrayPartitioner(bc:
Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner {
val arr = bc.value
override def numPartitions: Int = arr.length
override def getPartition(key: Any): Int =
Bytes.toInt(arr(0).get)
}
In the code above, snippet 1 can work as expected. But snippet 2 throws
"Task not serializable: java.io.NotSerializableException:
org.apache.hadoop.hbase.io.ImmutableBytesWritable" .
So do I have to implement a Kryo serializer for Array[T] if it is used in
broadcast ?
Thanks