The problem seems to be that unpicklable RDD objects are being pulled into 
function closures.  In your failing dockets, it looks like the rdd created 
through sc.parallelize is being pulled into the map lambda’s function closure.

I opened a new Dill bug with a small test case that reproduces this issue: 
https://github.com/uqfoundation/dill/issues/50.

I tried manually modifying dill to drop `rdd` and `sc` variables from function 
globals dicts, which seemed to solve the problem, but that’s not a 
general-purpose fix.
On June 25, 2014 at 12:56:10 PM, Mark Baker (dist...@acm.org) wrote:

Hey,  

On Mon, Jun 23, 2014 at 5:27 PM, Mark Baker <dist...@acm.org> wrote:  
> Thanks for the context, Josh.  
>  
> I've gone ahead and created a new test case and just opened a new issue;  
>  
> https://github.com/uqfoundation/dill/issues/49  

So that one's dealt with; it was a sys.prefix issue with me using a  
virtualenv and was fixed in a soon-to-be pull request a couple weeks  
ago.  

With that patch applied though, I'm now running into other doctest  
issues, these involving serializing Py4J objects, and again, only  
occurring inside doctests, not from the shell. I've been unable to  
distill this one down to a compact test case, nor gain any insight to  
the cause, and could really use a nudge in the right direction.  
Thanks!  

Top and bottom of sample trace (excluded middle is the usual recursive  
pickling calls);  

File "pyspark/rdd.py", line 1487, in __main__.PipelinedRDD  
Failed example:  
rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect()  
Exception raised:  
Traceback (most recent call last):  
File "/usr/lib/python2.7/doctest.py", line 1289, in __run  
compileflags, 1) in test.globs  
File "<doctest __main__.PipelinedRDD[1]>", line 1, in <module>  
rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect()  
File "/home/mbaker/venvs/bibframe/src/spark-python3/python/pyspark/rdd.py",  
line 201, in cache  
self._jrdd.cache()  
File "/home/mbaker/venvs/bibframe/src/spark-python3/python/pyspark/rdd.py",  
line 1531, in _jrdd  
pickled_command = DillSerializer().dumps(command)  
File 
"/home/mbaker/venvs/bibframe/src/spark-python3/python/pyspark/serializers.py",  
line 284, in dumps  
def dumps(self, obj): return dill.dumps(obj, 2)  
File 
"/home/mbaker/venvs/bibframe/local/lib/python2.7/site-packages/dill-0.2.2.dev-py2.7.egg/dill/dill.py",
  
line 169, in dumps  
dump(obj, file, protocol, byref)  
File 
"/home/mbaker/venvs/bibframe/local/lib/python2.7/site-packages/dill-0.2.2.dev-py2.7.egg/dill/dill.py",
  
line 162, in dump  
pik.dump(obj)  
File "/usr/lib/python2.7/pickle.py", line 224, in dump  
self.save(obj)  
...  
File 
"/home/mbaker/venvs/bibframe/local/lib/python2.7/site-packages/dill-0.2.2.dev-py2.7.egg/dill/dill.py",
  
line 543, in save_module_dict  
StockPickler.save_dict(pickler, obj)  
File "/usr/lib/python2.7/pickle.py", line 650, in save_dict  
self._batch_setitems(obj.iteritems())  
File "/usr/lib/python2.7/pickle.py", line 682, in _batch_setitems  
save(v)  
File "/usr/lib/python2.7/pickle.py", line 307, in save  
rv = reduce(self.proto)  
File 
"/home/mbaker/venvs/bibframe/src/spark-python3/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
  
line 537, in __call__  
self.target_id, self.name)  
File 
"/home/mbaker/venvs/bibframe/src/spark-python3/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py",
  
line 304, in get_return_value  
format(target_id, '.', name, value))  
Py4JError: An error occurred while calling o15.__getnewargs__. Trace:  
py4j.Py4JException: Method __getnewargs__([]) does not exist  
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)  
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)  
at py4j.Gateway.invoke(Gateway.java:251)  
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)  
at py4j.commands.CallCommand.execute(CallCommand.java:79)  
at py4j.GatewayConnection.run(GatewayConnection.java:207)  
at java.lang.Thread.run(Thread.java:745)  

Reply via email to