Our project is having a hard time following what we are supposed to do to
migrate this function from Spark 1.2 to 1.3.
/**
* Dump matrix as computed Mahout's DRM into specified (HD)FS path
* @param path
*/
def dfsWrite(path: String) = {
val ktag = implicitly[ClassTag[K]]
//val vtag = implicitly[ClassTag[Vector]]
implicit val k2wFunc: (K) => Writable =
if (ktag.runtimeClass == classOf[Int]) (x: K) => new
IntWritable(x.asInstanceOf[Int])
else if (ktag.runtimeClass == classOf[String]) (x: K) => new
Text(x.asInstanceOf[String])
else if (ktag.runtimeClass == classOf[Long]) (x: K) => new
LongWritable(x.asInstanceOf[Long])
else if (classOf[Writable].isAssignableFrom(ktag.runtimeClass)) (x: K) =>
x.asInstanceOf[Writable]
else throw new IllegalArgumentException("Do not know how to convert class
tag %s to Writable.".format(ktag))
// the problem is here =====
// this worked in Spark 1.2 and as we understand things should in 1.3 if we
have the right implicits
// rdd.saveAsSequenceFile(path)
// this works in Spark 1.3 but uses a deprecated method
SparkContext.rddToSequenceFileRDDFunctions(rdd.asInstanceOf[RDD[(K,
Vector)]]).saveAsSequenceFile(path)
}
As we understand it, we need to supply implicit writeable factories now instead
of writables? The rdd is a sequence of key = one of the classes above, value =
a Mahout “Vector". These are usually serialized through Kryo (not
JavaSerializer) for closures so we have compatible classes for that.
Any pointers would be helpful.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]