Bumping this. Anyone of you having some familiarity with py4j interface in pyspark?
thanks 2015-04-27 22:09 GMT-07:00 Stephen Boesch <java...@gmail.com>: > > My intention is to add pyspark support for certain mllib spark methods. I > have been unable to resolve pickling errors of the form > > Pyspark py4j PickleException: “expected zero arguments for > construction of ClassDict” > <http://stackoverflow.com/questions/29910708/pyspark-py4j-pickleexception-expected-zero-arguments-for-construction-of-class> > > These are occurring during python to java conversion of python named > tuples. The details are rather hard to provide here so I have created an > SOF question > > > http://stackoverflow.com/questions/29910708/pyspark-py4j-pickleexception-expected-zero-arguments-for-construction-of-class > > In any case I have included the text here. The SOF is easier to read > though ;) > > -------------- > > This question is directed towards persons familiar with py4j - and can > help to resolve a pickling error. I am trying to add a method to the > pyspark PythonMLLibAPI that accepts an RDD of a namedtuple, does some work, > and returns a result in the form of an RDD. > > This method is modeled after the PYthonMLLibAPI.trainALSModel() method, > whose analogous *existing* relevant portions are: > > def trainALSModel( > ratingsJRDD: JavaRDD[Rating], > .. ) > > The *existing* python Rating class used to model the new code is: > > class Rating(namedtuple("Rating", ["user", "product", "rating"])): > def __reduce__(self): > return Rating, (int(self.user), int(self.product), float(self.rating)) > > Here is the attempt So here are the relevant classes: > > *New* python class pyspark.mllib.clustering.MatrixEntry: > > from collections import namedtupleclass MatrixEntry(namedtuple("MatrixEntry", > ["x","y","weight"])): > def __reduce__(self): > return MatrixEntry, (long(self.x), long(self.y), float(self.weight)) > > *New* method *foobarRDD* In PythonMLLibAPI: > > def foobarRdd( > data: JavaRDD[MatrixEntry]): RDD[FooBarResult] = { > val rdd = data.rdd.map { d => FooBarResult(d.i, d.j, d.value, d.i * 100 + > d.j * 10 + d.value)} > rdd > } > > Now let us try it out: > > from pyspark.mllib.clustering import MatrixEntry > def convert_to_MatrixEntry(tuple): > return MatrixEntry(*tuple) > from pyspark.mllib.clustering import * > pic = PowerIterationClusteringModel(2) > tups = [(1,2,3),(4,5,6),(12,13,14),(15,7,8),(16,17,16.5)] > trdd = sc.parallelize(map(convert_to_MatrixEntry,tups)) > # print out the RDD on python side just for validationprint "%s" > %(repr(trdd.collect())) > from pyspark.mllib.common import callMLlibFunc > pic = callMLlibFunc("foobar", trdd) > > Relevant portions of results: > > [(1,2)=3.0, (4,5)=6.0, (12,13)=14.0, (15,7)=8.0, (16,17)=16.5] > > which shows the input rdd is 'whole'. However the pickling was unhappy: > > 5/04/27 21:15:44 ERROR Executor: Exception in task 6.0 in stage 1.0 (TID 14) > net.razorvine.pickle.PickleException: expected zero arguments for > construction of ClassDict(for pyspark.mllib.clustering.MatrixEntry) > at > net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) > at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:617) > at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:170) > at net.razorvine.pickle.Unpickler.load(Unpickler.java:84) > at net.razorvine.pickle.Unpickler.loads(Unpickler.java:97) > at > org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(PythonMLLibAPI.scala:1167) > at > org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(PythonMLLibAPI.scala:1166) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819) > at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1523) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1523) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:212) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:724) > > > Here is the python invocation stack trace: > > Py4JJavaError Traceback (most recent call last) > <ipython-input-2-3589950a5c09> in <module>() > 12 > 13 from pyspark.mllib.common import callMLlibFunc > ---> 14 pic = callMLlibFunc("foobar", trdd) > > /shared/picpy/python/pyspark/mllib/common.pyc in callMLlibFunc(name, *args) > 119 sc = SparkContext._active_spark_context > 120 api = getattr(sc._jvm.PythonMLLibAPI(), name) > --> 121 return callJavaFunc(sc, api, *args) > 122 > 123 > > /shared/picpy/python/pyspark/mllib/common.pyc in callJavaFunc(sc, func, > *args) > 112 """ Call Java Function """ > 113 args = [_py2java(sc, a) for a in args] > --> 114 return _java2py(sc, func(*args)) > 115 > 116 > > /Library/Python/2.7/site-packages/py4j-0.8.2.1-py2.7.egg/py4j/java_gateway.pyc > in __call__(self, *args) > 536 answer = self.gateway_client.send_command(command) > 537 return_value = get_return_value(answer, > self.gateway_client, > --> 538 self.target_id, self.name) > 539 > 540 for temp_arg in temp_args: > > /Library/Python/2.7/site-packages/py4j-0.8.2.1-py2.7.egg/py4j/protocol.pyc > in get_return_value(answer, gateway_client, target_id, name) > 298 raise Py4JJavaError( > 299 'An error occurred while calling {0}{1}{2}.\n'. > --> 300 format(target_id, '.', name), value) > 301 else: > 302 raise Py4JError( > > Py4JJavaError: An error occurred while calling o31.foobar. > > > > >