> What is the error? Could you file a JIRA for it?

Turns out there's actually 3 separate errors (indicated below), one of
which **silently returns the wrong value to the user*.*  Should I file a
separate JIRA for each one?  What level should I mark these as (critical,
major, etc.)?

I'm not sure that all of these are bugs as much as feature requests since
it looks like the design of FramedSerializer includes some size constraints
(https://github.com/apache/spark/blob/master/python/pyspark/serializers.py
"Serializer that writes objects as a stream of (length, data) pairs, where
C{length} is a 32-bit integer and data is C{length} bytes.").

Attempting to reproduce the bug in isolation in iPython notebook I've
observed the following. Note that I'm running python 2.7.3 on all machines
and using the Spark 1.1.0 binaries.

**BLOCK 1**  [no problem]
import cPickle
from pyspark import SparkContext

def check_pre_serialized(size):
    msg = cPickle.dumps(range(2 ** size))
    print 'serialized length:', len(msg)
    bvar = sc.broadcast(msg)
    print 'length recovered from broadcast variable:', len(bvar.value)
    print 'correct value recovered:', msg == bvar.value
    bvar.unpersist()

def check_unserialized(size):
    msg = range(2 ** size)
    bvar = sc.broadcast(msg)
    print 'correct value recovered:', msg == bvar.value
    bvar.unpersist()

SparkContext.setSystemProperty('spark.executor.memory', '15g')
SparkContext.setSystemProperty('spark.cores.max', '5')
sc = SparkContext('spark://crosby.research.intel-research.net:7077',
'broadcast_bug')

**BLOCK 2**  [no problem]
check_pre_serialized(20)
> serialized length: 9374656
> length recovered from broadcast variable: 9374656
> correct value recovered: True

**BLOCK 3**  [no problem]
check_unserialized(20)
> correct value recovered: True

**BLOCK 4**  [no problem]
check_pre_serialized(27)
> serialized length: 1499501632
> length recovered from broadcast variable: 1499501632
> correct value recovered: True

**BLOCK 5**  [no problem]
check_unserialized(27)
> correct value recovered: True

***BLOCK 6**  [ERROR 1: unhandled error from cPickle.dumps inside
sc.broadcast]*
check_pre_serialized(28)
.....
> /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj)
>     354
>     355     def dumps(self, obj):
> --> 356         return cPickle.dumps(obj, 2)
>     357
>     358     loads = cPickle.loads
>
> SystemError: error return without exception set

**BLOCK 7**  [no problem]
check_unserialized(28)
> correct value recovered: True

***BLOCK 8**  [ERROR 2: no error occurs and *incorrect result* is returned]*
check_pre_serialized(29)
> serialized length: 6331339840
> length recovered from broadcast variable: 2036372544
> correct value recovered: False

***BLOCK 9**  [ERROR 3: unhandled error from zlib.compress inside
sc.broadcast]*
check_unserialized(29)
......
> /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj)
>     418
>     419     def dumps(self, obj):
> --> 420         return zlib.compress(self.serializer.dumps(obj), 1)
>     421
>     422     def loads(self, obj):
>
> OverflowError: size does not fit in an int

***BLOCK 10**  [ERROR 1]*
check_pre_serialized(30)
...same as above...

***BLOCK 11**  [ERROR 3]*
check_unserialized(30)
...same as above...

On Thu, Sep 25, 2014 at 2:55 PM, Davies Liu <dav...@databricks.com> wrote:
>
> On Thu, Sep 25, 2014 at 11:25 AM, Brad Miller
> <bmill...@eecs.berkeley.edu> wrote:
> > Hi Davies,
> >
> > Thanks for your help.
> >
> > I ultimately re-wrote the code to use broadcast variables, and then
received
> > an error when trying to broadcast self.all_models that the size did not
fit
> > in an int (recall that broadcasts use 32 bit ints to store size),
>
> What is the error? Could you file a JIRA for it?
>
> > that it was in fact over 2G.  I don't know why the previous tests
(described
> > above) where duplicated portions of self.all_models worked (it could
have
> > been an error in either my debugging or notes), but splitting the
> > self.all_models into a separate broadcast variable for each element
worked.
> > I avoided broadcast variables for a while since there was no way to
> > unpersist them in pyspark, but now that there is you're completely right
> > that using broadcast is the correct way to code this.
>
> In 1.1, you could use broadcast.unpersist() to release it, also the
performance
> of Python Broadcast was much improved in 1.1.
>
>
> > best,
> > -Brad
> >
> > On Tue, Sep 23, 2014 at 12:16 PM, Davies Liu <dav...@databricks.com>
wrote:
> >>
> >> Or maybe there is a bug related to the base64 in py4j, could you
> >> dumps the serialized bytes of closure to verify this?
> >>
> >> You could add a line in spark/python/pyspark/rdd.py:
> >>
> >>         ser = CloudPickleSerializer()
> >>         pickled_command = ser.dumps(command)
> >> +      print len(pickled_command), repr(pickled_command)
> >>
> >>
> >> On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller
> >> <bmill...@eecs.berkeley.edu> wrote:
> >> > Hi Davies,
> >> >
> >> > That's interesting to know.  Here's more details about my code.  The
> >> > object
> >> > ("self") contains pointers to the spark_context (which seems to
generate
> >> > errors during serialization) so I strip off the extra state using the
> >> > outer
> >> > lambda function and just pass the value self.all_models into the map.
> >> > all_models is a list of length 9 where each element contains 3
numbers
> >> > (ints
> >> > or floats, can't remember) and then one LinearSVC object.  The
> >> > classifier
> >> > was trained over ~2.5M features, so the object isn't small, but
probably
> >> > shouldn't be 150M either.  Additionally, the call ran OK when I use
> >> > either
> >> > 2x the first 5 objects or 2x the last 5 objects (another reason why
it
> >> > seems
> >> > unlikely the bug was size related).
> >> >
> >> > def _predict_all_models(all_models, sample):
> >> >     scores = []
> >> >     for _, (_, _, classifier) in all_models:
> >> >         score = classifier.decision_function(sample[VALUE][RECORD])
> >> >         scores.append(float(score))
> >> >     return (sample[VALUE][LABEL], scores)
> >> >
> >> >     # fails
> >> > #    return (lambda am: testing_feature_rdd.map(lambda x:
> >> > _predict_all_models(am, x))) (self.all_models)
> >> >     # works
> >> > #    return (lambda am: testing_feature_rdd.map(lambda x:
> >> > _predict_all_models(am, x))) (self.all_models[:5] +
self.all_models[:5])
> >> > #    return (lambda am: testing_feature_rdd.map(lambda x:
> >> > _predict_all_models(am, x))) (self.all_models[4:] +
self.all_models[4:])
> >> >
> >> > I've since written a work-around into my code, but if I get a chance
> >> > I'll
> >> > switch to broadcast variables and see whether that works.
> >> >
> >> > later,
> >> > -brad
> >> >
> >> > On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu <dav...@databricks.com>
> >> > wrote:
> >> >>
> >> >> The traceback said that the serialized closure cannot be parsed
> >> >> (base64)
> >> >> correctly by py4j.
> >> >>
> >> >> The string in Java cannot be longer than 2G, so the serialized
closure
> >> >> cannot longer than 1.5G (there are overhead in base64), is it
possible
> >> >> that your data used in the map function is so big? If it's, you
should
> >> >> use broadcast for it.
> >> >>
> >> >> In master of Spark, we will use broadcast automatically if the
closure
> >> >> is too big. (but use broadcast explicitly is always better).
> >> >>
> >> >> On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller
> >> >> <bmill...@eecs.berkeley.edu> wrote:
> >> >> > Hi All,
> >> >> >
> >> >> > I'm experiencing a java.lang.NegativeArraySizeException in a
pyspark
> >> >> > script
> >> >> > I have.  I've pasted the full traceback at the end of this email.
> >> >> >
> >> >> > I have isolated the line of code in my script which "causes" the
> >> >> > exception
> >> >> > to occur. Although the exception seems to occur
deterministically, it
> >> >> > is
> >> >> > very unclear why the different variants of the line would cause
the
> >> >> > exception to occur. Unfortunately, I am only able to reproduce the
> >> >> > bug
> >> >> > in
> >> >> > the context of a large data processing job, and the line of code
> >> >> > which
> >> >> > must
> >> >> > change to reproduce the bug has little meaning out of context.
The
> >> >> > bug
> >> >> > occurs when I call "map" on an RDD with a function that references
> >> >> > some
> >> >> > state outside of the RDD (which is presumably bundled up and
> >> >> > distributed
> >> >> > with the function).  The output of the function is a tuple where
the
> >> >> > first
> >> >> > element is an int and the second element is a list of floats (same
> >> >> > positive
> >> >> > length every time, as verified by an 'assert' statement).
> >> >> >
> >> >> > Given that:
> >> >> > -It's unclear why changes in the line would cause an exception
> >> >> > -The exception comes from within pyspark code
> >> >> > -The exception has to do with negative array sizes (and I couldn't
> >> >> > have
> >> >> > created a negative sized array anywhere in my python code)
> >> >> > I suspect this is a bug in pyspark.
> >> >> >
> >> >> > Has anybody else observed or reported this bug?
> >> >> >
> >> >> > best,
> >> >> > -Brad
> >> >> >
> >> >> > Traceback (most recent call last):
> >> >> >   File "/home/bmiller1/pipeline/driver.py", line 214, in <module>
> >> >> >     main()
> >> >> >   File "/home/bmiller1/pipeline/driver.py", line 203, in main
> >> >> >     bl.write_results(iteration_out_dir)
> >> >> >   File "/home/bmiller1/pipeline/layer/svm_layer.py", line 137, in
> >> >> > write_results
> >> >> >     fig, accuracy = _get_results(self.prediction_rdd)
> >> >> >   File "/home/bmiller1/pipeline/layer/svm_layer.py", line 56, in
> >> >> > _get_results
> >> >> >     predictions = np.array(prediction_rdd.collect())
> >> >> >   File
"/home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py",
> >> >> > line
> >> >> > 723, in collect
> >> >> >     bytesInJava = self._jrdd.collect().iterator()
> >> >> >   File
"/home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py",
> >> >> > line
> >> >> > 2026, in _jrdd
> >> >> >     broadcast_vars, self.ctx._javaAccumulator)
> >> >> >   File
> >> >> >
> >> >> >
> >> >> >
"/home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> >> >> > line 701, in __call__
> >> >> >   File
> >> >> >
> >> >> >
> >> >> >
"/home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
> >> >> > line 304, in get_return_value
> >> >> > py4j.protocol.Py4JError: An error occurred while calling
> >> >> > None.org.apache.spark.api.python.PythonRDD. Trace:
> >> >> > java.lang.NegativeArraySizeException
> >> >> > at py4j.Base64.decode(Base64.java:292)
> >> >> > at py4j.Protocol.getBytes(Protocol.java:167)
> >> >> > at py4j.Protocol.getObject(Protocol.java:276)
> >> >> > at
> >> >> >
py4j.commands.AbstractCommand.getArguments(AbstractCommand.java:81)
> >> >> > at
> >> >> >
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:66)
> >> >> > at py4j.GatewayConnection.run(GatewayConnection.java:207)
> >> >> > at java.lang.Thread.run(Thread.java:701)
> >> >> >
> >> >
> >> >
> >
> >

Reply via email to