Hi Davies, Thanks for your help.
I ultimately re-wrote the code to use broadcast variables, and then received an error when trying to broadcast self.all_models that the size did not fit in an int (recall that broadcasts use 32 bit ints to store size), suggesting that it was in fact over 2G. I don't know why the previous tests (described above) where duplicated portions of self.all_models worked (it could have been an error in either my debugging or notes), but splitting the self.all_models into a separate broadcast variable for each element worked. I avoided broadcast variables for a while since there was no way to unpersist them in pyspark, but now that there is you're completely right that using broadcast is the correct way to code this. best, -Brad On Tue, Sep 23, 2014 at 12:16 PM, Davies Liu <dav...@databricks.com> wrote: > Or maybe there is a bug related to the base64 in py4j, could you > dumps the serialized bytes of closure to verify this? > > You could add a line in spark/python/pyspark/rdd.py: > > ser = CloudPickleSerializer() > pickled_command = ser.dumps(command) > + print len(pickled_command), repr(pickled_command) > > > On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller > <bmill...@eecs.berkeley.edu> wrote: > > Hi Davies, > > > > That's interesting to know. Here's more details about my code. The > object > > ("self") contains pointers to the spark_context (which seems to generate > > errors during serialization) so I strip off the extra state using the > outer > > lambda function and just pass the value self.all_models into the map. > > all_models is a list of length 9 where each element contains 3 numbers > (ints > > or floats, can't remember) and then one LinearSVC object. The classifier > > was trained over ~2.5M features, so the object isn't small, but probably > > shouldn't be 150M either. Additionally, the call ran OK when I use > either > > 2x the first 5 objects or 2x the last 5 objects (another reason why it > seems > > unlikely the bug was size related). > > > > def _predict_all_models(all_models, sample): > > scores = [] > > for _, (_, _, classifier) in all_models: > > score = classifier.decision_function(sample[VALUE][RECORD]) > > scores.append(float(score)) > > return (sample[VALUE][LABEL], scores) > > > > # fails > > # return (lambda am: testing_feature_rdd.map(lambda x: > > _predict_all_models(am, x))) (self.all_models) > > # works > > # return (lambda am: testing_feature_rdd.map(lambda x: > > _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5]) > > # return (lambda am: testing_feature_rdd.map(lambda x: > > _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:]) > > > > I've since written a work-around into my code, but if I get a chance I'll > > switch to broadcast variables and see whether that works. > > > > later, > > -brad > > > > On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu <dav...@databricks.com> > wrote: > >> > >> The traceback said that the serialized closure cannot be parsed (base64) > >> correctly by py4j. > >> > >> The string in Java cannot be longer than 2G, so the serialized closure > >> cannot longer than 1.5G (there are overhead in base64), is it possible > >> that your data used in the map function is so big? If it's, you should > >> use broadcast for it. > >> > >> In master of Spark, we will use broadcast automatically if the closure > >> is too big. (but use broadcast explicitly is always better). > >> > >> On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller > >> <bmill...@eecs.berkeley.edu> wrote: > >> > Hi All, > >> > > >> > I'm experiencing a java.lang.NegativeArraySizeException in a pyspark > >> > script > >> > I have. I've pasted the full traceback at the end of this email. > >> > > >> > I have isolated the line of code in my script which "causes" the > >> > exception > >> > to occur. Although the exception seems to occur deterministically, it > is > >> > very unclear why the different variants of the line would cause the > >> > exception to occur. Unfortunately, I am only able to reproduce the bug > >> > in > >> > the context of a large data processing job, and the line of code which > >> > must > >> > change to reproduce the bug has little meaning out of context. The > bug > >> > occurs when I call "map" on an RDD with a function that references > some > >> > state outside of the RDD (which is presumably bundled up and > distributed > >> > with the function). The output of the function is a tuple where the > >> > first > >> > element is an int and the second element is a list of floats (same > >> > positive > >> > length every time, as verified by an 'assert' statement). > >> > > >> > Given that: > >> > -It's unclear why changes in the line would cause an exception > >> > -The exception comes from within pyspark code > >> > -The exception has to do with negative array sizes (and I couldn't > have > >> > created a negative sized array anywhere in my python code) > >> > I suspect this is a bug in pyspark. > >> > > >> > Has anybody else observed or reported this bug? > >> > > >> > best, > >> > -Brad > >> > > >> > Traceback (most recent call last): > >> > File "/home/bmiller1/pipeline/driver.py", line 214, in <module> > >> > main() > >> > File "/home/bmiller1/pipeline/driver.py", line 203, in main > >> > bl.write_results(iteration_out_dir) > >> > File "/home/bmiller1/pipeline/layer/svm_layer.py", line 137, in > >> > write_results > >> > fig, accuracy = _get_results(self.prediction_rdd) > >> > File "/home/bmiller1/pipeline/layer/svm_layer.py", line 56, in > >> > _get_results > >> > predictions = np.array(prediction_rdd.collect()) > >> > File "/home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py", > line > >> > 723, in collect > >> > bytesInJava = self._jrdd.collect().iterator() > >> > File "/home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py", > line > >> > 2026, in _jrdd > >> > broadcast_vars, self.ctx._javaAccumulator) > >> > File > >> > > >> > > "/home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", > >> > line 701, in __call__ > >> > File > >> > > >> > > "/home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", > >> > line 304, in get_return_value > >> > py4j.protocol.Py4JError: An error occurred while calling > >> > None.org.apache.spark.api.python.PythonRDD. Trace: > >> > java.lang.NegativeArraySizeException > >> > at py4j.Base64.decode(Base64.java:292) > >> > at py4j.Protocol.getBytes(Protocol.java:167) > >> > at py4j.Protocol.getObject(Protocol.java:276) > >> > at py4j.commands.AbstractCommand.getArguments(AbstractCommand.java:81) > >> > at > py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:66) > >> > at py4j.GatewayConnection.run(GatewayConnection.java:207) > >> > at java.lang.Thread.run(Thread.java:701) > >> > > > > > >