Hi Stephen,

I think it would be easier to see what you implemented by showing the
branch diff link on github. There are couple utility class to make
Rating work between Scala and Python:

1. serializer: 
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala#L1163
2. mark it as picklable:
https://github.com/apache/spark/blob/master/python/pyspark/mllib/common.py#L56

However, I don't recommend you following this approach. It is much
simpler to use DataFrames for serialization. You can find an example
here:

https://github.com/apache/spark/blob/master/python/pyspark/mllib/evaluation.py#L23

Best,
Xiangrui

On Thu, Apr 30, 2015 at 12:07 PM, Stephen Boesch <java...@gmail.com> wrote:
> Bumping this.  Anyone of you having some familiarity with py4j interface in
> pyspark?
>
> thanks
>
>
> 2015-04-27 22:09 GMT-07:00 Stephen Boesch <java...@gmail.com>:
>
>>
>> My intention is to add pyspark support for certain mllib spark methods.  I
>> have been unable to resolve pickling errors of the form
>>
>>    Pyspark py4j PickleException: “expected zero arguments for
>> construction of ClassDict”
>> <http://stackoverflow.com/questions/29910708/pyspark-py4j-pickleexception-expected-zero-arguments-for-construction-of-class>
>>
>> These are occurring during python to java conversion of python named
>> tuples.  The details are rather hard to provide here so I have created an
>> SOF question
>>
>>
>> http://stackoverflow.com/questions/29910708/pyspark-py4j-pickleexception-expected-zero-arguments-for-construction-of-class
>>
>> In any case I have included the text here. The SOF is easier to read
>> though ;)
>>
>> --------------
>>
>> This question is directed towards persons familiar with py4j - and can
>> help to resolve a pickling error. I am trying to add a method to the
>> pyspark PythonMLLibAPI that accepts an RDD of a namedtuple, does some work,
>> and returns a result in the form of an RDD.
>>
>> This method is modeled after the PYthonMLLibAPI.trainALSModel() method,
>> whose analogous *existing* relevant portions are:
>>
>>   def trainALSModel(
>>     ratingsJRDD: JavaRDD[Rating],
>>     .. )
>>
>> The *existing* python Rating class used to model the new code is:
>>
>> class Rating(namedtuple("Rating", ["user", "product", "rating"])):
>>     def __reduce__(self):
>>         return Rating, (int(self.user), int(self.product), 
>> float(self.rating))
>>
>> Here is the attempt So here are the relevant classes:
>>
>> *New* python class pyspark.mllib.clustering.MatrixEntry:
>>
>> from collections import namedtupleclass 
>> MatrixEntry(namedtuple("MatrixEntry", ["x","y","weight"])):
>>     def __reduce__(self):
>>         return MatrixEntry, (long(self.x), long(self.y), float(self.weight))
>>
>> *New* method *foobarRDD* In PythonMLLibAPI:
>>
>>   def foobarRdd(
>>     data: JavaRDD[MatrixEntry]): RDD[FooBarResult] = {
>>     val rdd = data.rdd.map { d => FooBarResult(d.i, d.j, d.value, d.i * 100 
>> + d.j * 10 + d.value)}
>>     rdd
>>   }
>>
>> Now let us try it out:
>>
>> from pyspark.mllib.clustering import MatrixEntry
>> def convert_to_MatrixEntry(tuple):
>>   return MatrixEntry(*tuple)
>> from pyspark.mllib.clustering import *
>> pic = PowerIterationClusteringModel(2)
>> tups = [(1,2,3),(4,5,6),(12,13,14),(15,7,8),(16,17,16.5)]
>> trdd = sc.parallelize(map(convert_to_MatrixEntry,tups))
>> # print out the RDD on python side just for validationprint "%s" 
>> %(repr(trdd.collect()))
>> from pyspark.mllib.common import callMLlibFunc
>> pic = callMLlibFunc("foobar", trdd)
>>
>> Relevant portions of results:
>>
>> [(1,2)=3.0, (4,5)=6.0, (12,13)=14.0, (15,7)=8.0, (16,17)=16.5]
>>
>> which shows the input rdd is 'whole'. However the pickling was unhappy:
>>
>> 5/04/27 21:15:44 ERROR Executor: Exception in task 6.0 in stage 1.0 (TID 14)
>> net.razorvine.pickle.PickleException: expected zero arguments for 
>> construction of ClassDict(for pyspark.mllib.clustering.MatrixEntry)
>>     at 
>> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
>>     at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:617)
>>     at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:170)
>>     at net.razorvine.pickle.Unpickler.load(Unpickler.java:84)
>>     at net.razorvine.pickle.Unpickler.loads(Unpickler.java:97)
>>     at 
>> org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(PythonMLLibAPI.scala:1167)
>>     at 
>> org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(PythonMLLibAPI.scala:1166)
>>     at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>     at 
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>     at 
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>     at 
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>     at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>>     at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>     at 
>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>     at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>     at 
>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>     at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>     at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819)
>>     at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819)
>>     at 
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1523)
>>     at 
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1523)
>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>     at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:212)
>>     at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>     at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>     at java.lang.Thread.run(Thread.java:724)
>>
>>
>> Here is the python invocation stack trace:
>>
>> Py4JJavaError                             Traceback (most recent call last)
>> <ipython-input-2-3589950a5c09> in <module>()
>>      12
>>      13 from pyspark.mllib.common import callMLlibFunc
>> ---> 14 pic = callMLlibFunc("foobar", trdd)
>>
>> /shared/picpy/python/pyspark/mllib/common.pyc in callMLlibFunc(name, *args)
>>     119     sc = SparkContext._active_spark_context
>>     120     api = getattr(sc._jvm.PythonMLLibAPI(), name)
>> --> 121     return callJavaFunc(sc, api, *args)
>>     122
>>     123
>>
>> /shared/picpy/python/pyspark/mllib/common.pyc in callJavaFunc(sc, func,
>> *args)
>>     112     """ Call Java Function """
>>     113     args = [_py2java(sc, a) for a in args]
>> --> 114     return _java2py(sc, func(*args))
>>     115
>>     116
>>
>> /Library/Python/2.7/site-packages/py4j-0.8.2.1-py2.7.egg/py4j/java_gateway.pyc
>> in __call__(self, *args)
>>     536         answer = self.gateway_client.send_command(command)
>>     537         return_value = get_return_value(answer,
>> self.gateway_client,
>> --> 538                 self.target_id, self.name)
>>     539
>>     540         for temp_arg in temp_args:
>>
>> /Library/Python/2.7/site-packages/py4j-0.8.2.1-py2.7.egg/py4j/protocol.pyc
>> in get_return_value(answer, gateway_client, target_id, name)
>>     298                 raise Py4JJavaError(
>>     299                     'An error occurred while calling {0}{1}{2}.\n'.
>> --> 300                     format(target_id, '.', name), value)
>>     301             else:
>>     302                 raise Py4JError(
>>
>> Py4JJavaError: An error occurred while calling o31.foobar.
>>
>>
>>
>>
>>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to