The serializer is created with val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
Which is definitely not the closure serializer and so should respect what you are setting with spark.serializer. Maybe you can do a quick bit of debugging to see where that assumption breaks down? like are you sure spark.serializer is set everywhere? On Wed, Feb 18, 2015 at 4:31 AM, Matt Cheah <mch...@palantir.com> wrote: > Hi everyone, > > I was using JavaPairRDD’s combineByKey() to compute all of my aggregations > before, since I assumed that every aggregation required a key. However, I > realized I could do my analysis using JavaRDD’s aggregate() instead and not > use a key. > > I have set spark.serializer to use Kryo. As a result, JavaRDD’s combineByKey > requires that a “createCombiner” function is provided, and the return value > from that function must be serializable using Kryo. When I switched to using > rdd.aggregate I assumed that the zero value would also be strictly Kryo > serialized, as it is a data item and not part of a closure or the > aggregation functions. However, I got a serialization exception as the > closure serializer (only valid serializer is the Java serializer) was used > instead. > > I was wondering the following: > > What is the rationale for making the zero value be serialized using the > closure serializer? This isn’t part of the closure, but is an initial data > item. > Would it make sense for us to perhaps write a version of rdd.aggregate() > that takes a function as a parameter, that generates the zero value? This > would be more intuitive to be serialized using the closure serializer. > > I believe aggregateByKey is also affected. > > Thanks, > > -Matt Cheah --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org