On a slightly related note (apologies Soren for hijacking the thread), Reynold how much better is kryo from spark's usage point of view compared to the default java serialization (in general, not for closures) ? The numbers on kyro site are interesting, but since you have played the most with kryo in context of spark (i think) - how do you rate it along lines of :
1) computational overhead compared to java serialization. 2) memory overhead. 3) generated byte[] size. Particularly given the bugs Patrick and I had looked into in past along flush, etc I was always skeptical about using kyro. But given the pretty nasty issues with OOM's via java serialization we are seeing, wanted to know your thoughts on use of kyro with spark. (Will be slightly involved to ensure everything gets registered, but I want to go down the path assuming I hear good things in context of spark) Thanks, Mridul On Mon, May 5, 2014 at 1:20 AM, Reynold Xin <r...@databricks.com> wrote: > I added the config option to use the non-default serializer. However, at > the time, Kryo fails serializing pretty much any closures so that option > was never really used / recommended. > > Since then the Scala ecosystem has developed, and some other projects are > starting to use Kryo to serialize more Scala data structures, so I wouldn't > be surprised if there is a way to work around this now. However, I don't > have enough time to look into it at this point. If you do, please do post > your findings. Thanks. > > > > On Sun, May 4, 2014 at 10:35 AM, Soren Macbeth <so...@yieldbot.com> wrote: > >> apologies for the cross-list posts, but I've gotten zero response in the >> user list and I guess this list is probably more appropriate. >> >> According to the documentation, using the KryoSerializer for closures is >> supported. However, when I try to set `spark.closure.serializer` to >> `org.apache.spark.serializer.KryoSerializer` thing fail pretty miserably. >> >> The first thing that happens it that is throws exceptions over and over >> that it cannot locate my registrator class, which is located in my assembly >> jar like so: >> >> 14/05/04 12:03:20 ERROR serializer.KryoSerializer: Failed to run >> spark.kryo.registrator >> java.lang.ClassNotFoundException: pickles.kryo.PicklesRegistrator >> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) >> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) >> at java.security.AccessController.doPrivileged(Native Method) >> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >> at java.lang.Class.forName0(Native Method) >> at java.lang.Class.forName(Class.java:270) >> at >> >> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:63) >> at >> >> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:61) >> at scala.Option.foreach(Option.scala:236) >> at >> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:61) >> at >> >> org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:116) >> at >> >> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:79) >> at >> >> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:180) >> at >> >> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) >> at >> >> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:415) >> at >> >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438) >> at >> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) >> at >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:724) >> >> Now, I would expect it not to be able to find this class since it hasn't >> yet fetched my assembly jar to the executors. Once it does fetch my jar, >> those expections stop. Next, all the executor task die with the following >> exception: >> >> java.nio.ReadOnlyBufferException >> at java.nio.ByteBuffer.array(ByteBuffer.java:961) >> at >> >> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:136) >> at >> >> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193) >> at >> >> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) >> at >> >> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:415) >> at >> >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438) >> at >> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) >> at >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:724) >> >> AFAIK, I'm not doing anything out of the ordinary, just turning on kryo and >> using the registrator mechanism to register a couple custom serializers. >> >> The reason I tried turning on kryo for closure in the first place is >> because of a different bug that I was hitting during fetching and >> deserializing of tasks from my executors, which I detailed here: >> >> >> http://apache-spark-user-list.1001560.n3.nabble.com/Crazy-Kryo-Exception-td5257.html >> >> Here's hoping some on this list can help me track down what's happening as >> I didn't get a single reply on the user list. >>