> What is the error? Could you file a JIRA for it? Turns out there's actually 3 separate errors (indicated below), one of which **silently returns the wrong value to the user*.* Should I file a separate JIRA for each one? What level should I mark these as (critical, major, etc.)?
I'm not sure that all of these are bugs as much as feature requests since it looks like the design of FramedSerializer includes some size constraints (https://github.com/apache/spark/blob/master/python/pyspark/serializers.py "Serializer that writes objects as a stream of (length, data) pairs, where C{length} is a 32-bit integer and data is C{length} bytes."). Attempting to reproduce the bug in isolation in iPython notebook I've observed the following. Note that I'm running python 2.7.3 on all machines and using the Spark 1.1.0 binaries. **BLOCK 1** [no problem] import cPickle from pyspark import SparkContext def check_pre_serialized(size): msg = cPickle.dumps(range(2 ** size)) print 'serialized length:', len(msg) bvar = sc.broadcast(msg) print 'length recovered from broadcast variable:', len(bvar.value) print 'correct value recovered:', msg == bvar.value bvar.unpersist() def check_unserialized(size): msg = range(2 ** size) bvar = sc.broadcast(msg) print 'correct value recovered:', msg == bvar.value bvar.unpersist() SparkContext.setSystemProperty('spark.executor.memory', '15g') SparkContext.setSystemProperty('spark.cores.max', '5') sc = SparkContext('spark://crosby.research.intel-research.net:7077', 'broadcast_bug') **BLOCK 2** [no problem] check_pre_serialized(20) > serialized length: 9374656 > length recovered from broadcast variable: 9374656 > correct value recovered: True **BLOCK 3** [no problem] check_unserialized(20) > correct value recovered: True **BLOCK 4** [no problem] check_pre_serialized(27) > serialized length: 1499501632 > length recovered from broadcast variable: 1499501632 > correct value recovered: True **BLOCK 5** [no problem] check_unserialized(27) > correct value recovered: True ***BLOCK 6** [ERROR 1: unhandled error from cPickle.dumps inside sc.broadcast]* check_pre_serialized(28) ..... > /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj) > 354 > 355 def dumps(self, obj): > --> 356 return cPickle.dumps(obj, 2) > 357 > 358 loads = cPickle.loads > > SystemError: error return without exception set **BLOCK 7** [no problem] check_unserialized(28) > correct value recovered: True ***BLOCK 8** [ERROR 2: no error occurs and *incorrect result* is returned]* check_pre_serialized(29) > serialized length: 6331339840 > length recovered from broadcast variable: 2036372544 > correct value recovered: False ***BLOCK 9** [ERROR 3: unhandled error from zlib.compress inside sc.broadcast]* check_unserialized(29) ...... > /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj) > 418 > 419 def dumps(self, obj): > --> 420 return zlib.compress(self.serializer.dumps(obj), 1) > 421 > 422 def loads(self, obj): > > OverflowError: size does not fit in an int ***BLOCK 10** [ERROR 1]* check_pre_serialized(30) ...same as above... ***BLOCK 11** [ERROR 3]* check_unserialized(30) ...same as above... On Thu, Sep 25, 2014 at 2:55 PM, Davies Liu <dav...@databricks.com> wrote: > > On Thu, Sep 25, 2014 at 11:25 AM, Brad Miller > <bmill...@eecs.berkeley.edu> wrote: > > Hi Davies, > > > > Thanks for your help. > > > > I ultimately re-wrote the code to use broadcast variables, and then received > > an error when trying to broadcast self.all_models that the size did not fit > > in an int (recall that broadcasts use 32 bit ints to store size), > > What is the error? Could you file a JIRA for it? > > > that it was in fact over 2G. I don't know why the previous tests (described > > above) where duplicated portions of self.all_models worked (it could have > > been an error in either my debugging or notes), but splitting the > > self.all_models into a separate broadcast variable for each element worked. > > I avoided broadcast variables for a while since there was no way to > > unpersist them in pyspark, but now that there is you're completely right > > that using broadcast is the correct way to code this. > > In 1.1, you could use broadcast.unpersist() to release it, also the performance > of Python Broadcast was much improved in 1.1. > > > > best, > > -Brad > > > > On Tue, Sep 23, 2014 at 12:16 PM, Davies Liu <dav...@databricks.com> wrote: > >> > >> Or maybe there is a bug related to the base64 in py4j, could you > >> dumps the serialized bytes of closure to verify this? > >> > >> You could add a line in spark/python/pyspark/rdd.py: > >> > >> ser = CloudPickleSerializer() > >> pickled_command = ser.dumps(command) > >> + print len(pickled_command), repr(pickled_command) > >> > >> > >> On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller > >> <bmill...@eecs.berkeley.edu> wrote: > >> > Hi Davies, > >> > > >> > That's interesting to know. Here's more details about my code. The > >> > object > >> > ("self") contains pointers to the spark_context (which seems to generate > >> > errors during serialization) so I strip off the extra state using the > >> > outer > >> > lambda function and just pass the value self.all_models into the map. > >> > all_models is a list of length 9 where each element contains 3 numbers > >> > (ints > >> > or floats, can't remember) and then one LinearSVC object. The > >> > classifier > >> > was trained over ~2.5M features, so the object isn't small, but probably > >> > shouldn't be 150M either. Additionally, the call ran OK when I use > >> > either > >> > 2x the first 5 objects or 2x the last 5 objects (another reason why it > >> > seems > >> > unlikely the bug was size related). > >> > > >> > def _predict_all_models(all_models, sample): > >> > scores = [] > >> > for _, (_, _, classifier) in all_models: > >> > score = classifier.decision_function(sample[VALUE][RECORD]) > >> > scores.append(float(score)) > >> > return (sample[VALUE][LABEL], scores) > >> > > >> > # fails > >> > # return (lambda am: testing_feature_rdd.map(lambda x: > >> > _predict_all_models(am, x))) (self.all_models) > >> > # works > >> > # return (lambda am: testing_feature_rdd.map(lambda x: > >> > _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5]) > >> > # return (lambda am: testing_feature_rdd.map(lambda x: > >> > _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:]) > >> > > >> > I've since written a work-around into my code, but if I get a chance > >> > I'll > >> > switch to broadcast variables and see whether that works. > >> > > >> > later, > >> > -brad > >> > > >> > On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu <dav...@databricks.com> > >> > wrote: > >> >> > >> >> The traceback said that the serialized closure cannot be parsed > >> >> (base64) > >> >> correctly by py4j. > >> >> > >> >> The string in Java cannot be longer than 2G, so the serialized closure > >> >> cannot longer than 1.5G (there are overhead in base64), is it possible > >> >> that your data used in the map function is so big? If it's, you should > >> >> use broadcast for it. > >> >> > >> >> In master of Spark, we will use broadcast automatically if the closure > >> >> is too big. (but use broadcast explicitly is always better). > >> >> > >> >> On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller > >> >> <bmill...@eecs.berkeley.edu> wrote: > >> >> > Hi All, > >> >> > > >> >> > I'm experiencing a java.lang.NegativeArraySizeException in a pyspark > >> >> > script > >> >> > I have. I've pasted the full traceback at the end of this email. > >> >> > > >> >> > I have isolated the line of code in my script which "causes" the > >> >> > exception > >> >> > to occur. Although the exception seems to occur deterministically, it > >> >> > is > >> >> > very unclear why the different variants of the line would cause the > >> >> > exception to occur. Unfortunately, I am only able to reproduce the > >> >> > bug > >> >> > in > >> >> > the context of a large data processing job, and the line of code > >> >> > which > >> >> > must > >> >> > change to reproduce the bug has little meaning out of context. The > >> >> > bug > >> >> > occurs when I call "map" on an RDD with a function that references > >> >> > some > >> >> > state outside of the RDD (which is presumably bundled up and > >> >> > distributed > >> >> > with the function). The output of the function is a tuple where the > >> >> > first > >> >> > element is an int and the second element is a list of floats (same > >> >> > positive > >> >> > length every time, as verified by an 'assert' statement). > >> >> > > >> >> > Given that: > >> >> > -It's unclear why changes in the line would cause an exception > >> >> > -The exception comes from within pyspark code > >> >> > -The exception has to do with negative array sizes (and I couldn't > >> >> > have > >> >> > created a negative sized array anywhere in my python code) > >> >> > I suspect this is a bug in pyspark. > >> >> > > >> >> > Has anybody else observed or reported this bug? > >> >> > > >> >> > best, > >> >> > -Brad > >> >> > > >> >> > Traceback (most recent call last): > >> >> > File "/home/bmiller1/pipeline/driver.py", line 214, in <module> > >> >> > main() > >> >> > File "/home/bmiller1/pipeline/driver.py", line 203, in main > >> >> > bl.write_results(iteration_out_dir) > >> >> > File "/home/bmiller1/pipeline/layer/svm_layer.py", line 137, in > >> >> > write_results > >> >> > fig, accuracy = _get_results(self.prediction_rdd) > >> >> > File "/home/bmiller1/pipeline/layer/svm_layer.py", line 56, in > >> >> > _get_results > >> >> > predictions = np.array(prediction_rdd.collect()) > >> >> > File "/home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py", > >> >> > line > >> >> > 723, in collect > >> >> > bytesInJava = self._jrdd.collect().iterator() > >> >> > File "/home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py", > >> >> > line > >> >> > 2026, in _jrdd > >> >> > broadcast_vars, self.ctx._javaAccumulator) > >> >> > File > >> >> > > >> >> > > >> >> > "/home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", > >> >> > line 701, in __call__ > >> >> > File > >> >> > > >> >> > > >> >> > "/home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", > >> >> > line 304, in get_return_value > >> >> > py4j.protocol.Py4JError: An error occurred while calling > >> >> > None.org.apache.spark.api.python.PythonRDD. Trace: > >> >> > java.lang.NegativeArraySizeException > >> >> > at py4j.Base64.decode(Base64.java:292) > >> >> > at py4j.Protocol.getBytes(Protocol.java:167) > >> >> > at py4j.Protocol.getObject(Protocol.java:276) > >> >> > at > >> >> > py4j.commands.AbstractCommand.getArguments(AbstractCommand.java:81) > >> >> > at > >> >> > py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:66) > >> >> > at py4j.GatewayConnection.run(GatewayConnection.java:207) > >> >> > at java.lang.Thread.run(Thread.java:701) > >> >> > > >> > > >> > > > > >