Bumping this.  Anyone of you having some familiarity with py4j interface in
pyspark?

thanks


2015-04-27 22:09 GMT-07:00 Stephen Boesch <java...@gmail.com>:

>
> My intention is to add pyspark support for certain mllib spark methods.  I
> have been unable to resolve pickling errors of the form
>
>    Pyspark py4j PickleException: “expected zero arguments for
> construction of ClassDict”
> <http://stackoverflow.com/questions/29910708/pyspark-py4j-pickleexception-expected-zero-arguments-for-construction-of-class>
>
> These are occurring during python to java conversion of python named
> tuples.  The details are rather hard to provide here so I have created an
> SOF question
>
>
> http://stackoverflow.com/questions/29910708/pyspark-py4j-pickleexception-expected-zero-arguments-for-construction-of-class
>
> In any case I have included the text here. The SOF is easier to read
> though ;)
>
> --------------
>
> This question is directed towards persons familiar with py4j - and can
> help to resolve a pickling error. I am trying to add a method to the
> pyspark PythonMLLibAPI that accepts an RDD of a namedtuple, does some work,
> and returns a result in the form of an RDD.
>
> This method is modeled after the PYthonMLLibAPI.trainALSModel() method,
> whose analogous *existing* relevant portions are:
>
>   def trainALSModel(
>     ratingsJRDD: JavaRDD[Rating],
>     .. )
>
> The *existing* python Rating class used to model the new code is:
>
> class Rating(namedtuple("Rating", ["user", "product", "rating"])):
>     def __reduce__(self):
>         return Rating, (int(self.user), int(self.product), float(self.rating))
>
> Here is the attempt So here are the relevant classes:
>
> *New* python class pyspark.mllib.clustering.MatrixEntry:
>
> from collections import namedtupleclass MatrixEntry(namedtuple("MatrixEntry", 
> ["x","y","weight"])):
>     def __reduce__(self):
>         return MatrixEntry, (long(self.x), long(self.y), float(self.weight))
>
> *New* method *foobarRDD* In PythonMLLibAPI:
>
>   def foobarRdd(
>     data: JavaRDD[MatrixEntry]): RDD[FooBarResult] = {
>     val rdd = data.rdd.map { d => FooBarResult(d.i, d.j, d.value, d.i * 100 + 
> d.j * 10 + d.value)}
>     rdd
>   }
>
> Now let us try it out:
>
> from pyspark.mllib.clustering import MatrixEntry
> def convert_to_MatrixEntry(tuple):
>   return MatrixEntry(*tuple)
> from pyspark.mllib.clustering import *
> pic = PowerIterationClusteringModel(2)
> tups = [(1,2,3),(4,5,6),(12,13,14),(15,7,8),(16,17,16.5)]
> trdd = sc.parallelize(map(convert_to_MatrixEntry,tups))
> # print out the RDD on python side just for validationprint "%s" 
> %(repr(trdd.collect()))
> from pyspark.mllib.common import callMLlibFunc
> pic = callMLlibFunc("foobar", trdd)
>
> Relevant portions of results:
>
> [(1,2)=3.0, (4,5)=6.0, (12,13)=14.0, (15,7)=8.0, (16,17)=16.5]
>
> which shows the input rdd is 'whole'. However the pickling was unhappy:
>
> 5/04/27 21:15:44 ERROR Executor: Exception in task 6.0 in stage 1.0 (TID 14)
> net.razorvine.pickle.PickleException: expected zero arguments for 
> construction of ClassDict(for pyspark.mllib.clustering.MatrixEntry)
>     at 
> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
>     at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:617)
>     at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:170)
>     at net.razorvine.pickle.Unpickler.load(Unpickler.java:84)
>     at net.razorvine.pickle.Unpickler.loads(Unpickler.java:97)
>     at 
> org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(PythonMLLibAPI.scala:1167)
>     at 
> org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(PythonMLLibAPI.scala:1166)
>     at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>     at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>     at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>     at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>     at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>     at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>     at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>     at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>     at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>     at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>     at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819)
>     at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819)
>     at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1523)
>     at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1523)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>     at org.apache.spark.scheduler.Task.run(Task.scala:64)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:212)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:724)
>
>
> Here is the python invocation stack trace:
>
> Py4JJavaError                             Traceback (most recent call last)
> <ipython-input-2-3589950a5c09> in <module>()
>      12
>      13 from pyspark.mllib.common import callMLlibFunc
> ---> 14 pic = callMLlibFunc("foobar", trdd)
>
> /shared/picpy/python/pyspark/mllib/common.pyc in callMLlibFunc(name, *args)
>     119     sc = SparkContext._active_spark_context
>     120     api = getattr(sc._jvm.PythonMLLibAPI(), name)
> --> 121     return callJavaFunc(sc, api, *args)
>     122
>     123
>
> /shared/picpy/python/pyspark/mllib/common.pyc in callJavaFunc(sc, func,
> *args)
>     112     """ Call Java Function """
>     113     args = [_py2java(sc, a) for a in args]
> --> 114     return _java2py(sc, func(*args))
>     115
>     116
>
> /Library/Python/2.7/site-packages/py4j-0.8.2.1-py2.7.egg/py4j/java_gateway.pyc
> in __call__(self, *args)
>     536         answer = self.gateway_client.send_command(command)
>     537         return_value = get_return_value(answer,
> self.gateway_client,
> --> 538                 self.target_id, self.name)
>     539
>     540         for temp_arg in temp_args:
>
> /Library/Python/2.7/site-packages/py4j-0.8.2.1-py2.7.egg/py4j/protocol.pyc
> in get_return_value(answer, gateway_client, target_id, name)
>     298                 raise Py4JJavaError(
>     299                     'An error occurred while calling {0}{1}{2}.\n'.
> --> 300                     format(target_id, '.', name), value)
>     301             else:
>     302                 raise Py4JError(
>
> Py4JJavaError: An error occurred while calling o31.foobar.
>
>
>
>
>

Reply via email to