Hi Stephen, I think it would be easier to see what you implemented by showing the branch diff link on github. There are couple utility class to make Rating work between Scala and Python:
1. serializer: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala#L1163 2. mark it as picklable: https://github.com/apache/spark/blob/master/python/pyspark/mllib/common.py#L56 However, I don't recommend you following this approach. It is much simpler to use DataFrames for serialization. You can find an example here: https://github.com/apache/spark/blob/master/python/pyspark/mllib/evaluation.py#L23 Best, Xiangrui On Thu, Apr 30, 2015 at 12:07 PM, Stephen Boesch <java...@gmail.com> wrote: > Bumping this. Anyone of you having some familiarity with py4j interface in > pyspark? > > thanks > > > 2015-04-27 22:09 GMT-07:00 Stephen Boesch <java...@gmail.com>: > >> >> My intention is to add pyspark support for certain mllib spark methods. I >> have been unable to resolve pickling errors of the form >> >> Pyspark py4j PickleException: “expected zero arguments for >> construction of ClassDict” >> <http://stackoverflow.com/questions/29910708/pyspark-py4j-pickleexception-expected-zero-arguments-for-construction-of-class> >> >> These are occurring during python to java conversion of python named >> tuples. The details are rather hard to provide here so I have created an >> SOF question >> >> >> http://stackoverflow.com/questions/29910708/pyspark-py4j-pickleexception-expected-zero-arguments-for-construction-of-class >> >> In any case I have included the text here. The SOF is easier to read >> though ;) >> >> -------------- >> >> This question is directed towards persons familiar with py4j - and can >> help to resolve a pickling error. I am trying to add a method to the >> pyspark PythonMLLibAPI that accepts an RDD of a namedtuple, does some work, >> and returns a result in the form of an RDD. >> >> This method is modeled after the PYthonMLLibAPI.trainALSModel() method, >> whose analogous *existing* relevant portions are: >> >> def trainALSModel( >> ratingsJRDD: JavaRDD[Rating], >> .. ) >> >> The *existing* python Rating class used to model the new code is: >> >> class Rating(namedtuple("Rating", ["user", "product", "rating"])): >> def __reduce__(self): >> return Rating, (int(self.user), int(self.product), >> float(self.rating)) >> >> Here is the attempt So here are the relevant classes: >> >> *New* python class pyspark.mllib.clustering.MatrixEntry: >> >> from collections import namedtupleclass >> MatrixEntry(namedtuple("MatrixEntry", ["x","y","weight"])): >> def __reduce__(self): >> return MatrixEntry, (long(self.x), long(self.y), float(self.weight)) >> >> *New* method *foobarRDD* In PythonMLLibAPI: >> >> def foobarRdd( >> data: JavaRDD[MatrixEntry]): RDD[FooBarResult] = { >> val rdd = data.rdd.map { d => FooBarResult(d.i, d.j, d.value, d.i * 100 >> + d.j * 10 + d.value)} >> rdd >> } >> >> Now let us try it out: >> >> from pyspark.mllib.clustering import MatrixEntry >> def convert_to_MatrixEntry(tuple): >> return MatrixEntry(*tuple) >> from pyspark.mllib.clustering import * >> pic = PowerIterationClusteringModel(2) >> tups = [(1,2,3),(4,5,6),(12,13,14),(15,7,8),(16,17,16.5)] >> trdd = sc.parallelize(map(convert_to_MatrixEntry,tups)) >> # print out the RDD on python side just for validationprint "%s" >> %(repr(trdd.collect())) >> from pyspark.mllib.common import callMLlibFunc >> pic = callMLlibFunc("foobar", trdd) >> >> Relevant portions of results: >> >> [(1,2)=3.0, (4,5)=6.0, (12,13)=14.0, (15,7)=8.0, (16,17)=16.5] >> >> which shows the input rdd is 'whole'. However the pickling was unhappy: >> >> 5/04/27 21:15:44 ERROR Executor: Exception in task 6.0 in stage 1.0 (TID 14) >> net.razorvine.pickle.PickleException: expected zero arguments for >> construction of ClassDict(for pyspark.mllib.clustering.MatrixEntry) >> at >> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) >> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:617) >> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:170) >> at net.razorvine.pickle.Unpickler.load(Unpickler.java:84) >> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:97) >> at >> org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(PythonMLLibAPI.scala:1167) >> at >> org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(PythonMLLibAPI.scala:1166) >> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> at >> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >> at >> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >> at >> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) >> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) >> at scala.collection.AbstractIterator.to(Iterator.scala:1157) >> at >> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) >> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) >> at >> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) >> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) >> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819) >> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819) >> at >> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1523) >> at >> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1523) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >> at org.apache.spark.scheduler.Task.run(Task.scala:64) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:212) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:724) >> >> >> Here is the python invocation stack trace: >> >> Py4JJavaError Traceback (most recent call last) >> <ipython-input-2-3589950a5c09> in <module>() >> 12 >> 13 from pyspark.mllib.common import callMLlibFunc >> ---> 14 pic = callMLlibFunc("foobar", trdd) >> >> /shared/picpy/python/pyspark/mllib/common.pyc in callMLlibFunc(name, *args) >> 119 sc = SparkContext._active_spark_context >> 120 api = getattr(sc._jvm.PythonMLLibAPI(), name) >> --> 121 return callJavaFunc(sc, api, *args) >> 122 >> 123 >> >> /shared/picpy/python/pyspark/mllib/common.pyc in callJavaFunc(sc, func, >> *args) >> 112 """ Call Java Function """ >> 113 args = [_py2java(sc, a) for a in args] >> --> 114 return _java2py(sc, func(*args)) >> 115 >> 116 >> >> /Library/Python/2.7/site-packages/py4j-0.8.2.1-py2.7.egg/py4j/java_gateway.pyc >> in __call__(self, *args) >> 536 answer = self.gateway_client.send_command(command) >> 537 return_value = get_return_value(answer, >> self.gateway_client, >> --> 538 self.target_id, self.name) >> 539 >> 540 for temp_arg in temp_args: >> >> /Library/Python/2.7/site-packages/py4j-0.8.2.1-py2.7.egg/py4j/protocol.pyc >> in get_return_value(answer, gateway_client, target_id, name) >> 298 raise Py4JJavaError( >> 299 'An error occurred while calling {0}{1}{2}.\n'. >> --> 300 format(target_id, '.', name), value) >> 301 else: >> 302 raise Py4JError( >> >> Py4JJavaError: An error occurred while calling o31.foobar. >> >> >> >> >> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org