The ScalaTest code that is enclosed at the end of this email message demonstrates what appears to be a bug in the KryoSerializer. This code was executed from IntelliJ IDEA (community edition) under Mac OS X 10.11.2 The KryoSerializer is enabled by updating the original SparkContext (that is supplied by the ScalaTest) via: 1. reading the SparkConf from the SparkContext,2. updating the SparkConf to enable the KryoSerializer,3. stopping the original SparkContext, and4. creating a new SparkContext from the updated SparkConf. Following enabling of the KryoSerializer, execution of the following line (line 56):val rddPartitionsSizes: Array[Int] = rdd.mapPartitions(iter => Array(iter.size).iterator, true).collectthrew the following three instances of IllegalArgumentException: java.lang.IllegalArgumentException: Class is not registered: scala.collection.mutable.WrappedArray$ofIntjava.lang.IllegalArgumentException: Class is not registered: int[]java.lang.IllegalArgumentException: Class is not registered: scala.Tuple3[]
which prompted registration of the following three classes with the KryoSerializer via the SparkConf.registerKryoClasses() method: classOf[scala.collection.mutable.WrappedArray.ofInt], classOf[Array[Int]], classOf[Array[Tuple3[_, _, _]]] Following registration of these three classes with the KryoSerializer, the above-indicated 'val rddPartitionsSizes...' line (line 56) executed without throwing an IllegalArgumentException. However, execution of the following line (line 59):val sortedRddPartitionsSizes: Array[Int] = sortedRdd.mapPartitions(iter => Array(iter.size).iterator, true).collect threw the following SparkException: Task not serializable org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2030) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1847) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:905) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) at org.apache.spark.rdd.RDD.collect(RDD.scala:904) at kryo.KryoSerializerTest$$anonfun$1.apply$mcV$sp(KryoSerializerTest.scala:59) at kryo.KryoSerializerTest$$anonfun$1.apply(KryoSerializerTest.scala:39) at kryo.KryoSerializerTest$$anonfun$1.apply(KryoSerializerTest.scala:39) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166) at org.scalatest.Suite$class.withFixture(Suite.scala:1122) at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555) at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175) at org.scalatest.FunSuite.runTest(FunSuite.scala:1555) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) at scala.collection.immutable.List.foreach(List.scala:381) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208) at org.scalatest.FunSuite.runTests(FunSuite.scala:1555) at org.scalatest.Suite$class.run(Suite.scala:1424) at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555) at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) at org.scalatest.SuperEngine.runImpl(Engine.scala:545) at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212) at kryo.KryoSerializerTest.org$scalatest$BeforeAndAfterAll$$super$run(KryoSerializerTest.scala:37) at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) at kryo.KryoSerializerTest.run(KryoSerializerTest.scala:37) at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55) at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563) at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557) at scala.collection.immutable.List.foreach(List.scala:381) at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557) at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044) at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043) at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722) at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043) at org.scalatest.tools.Runner$.run(Runner.scala:883) at org.scalatest.tools.Runner.run(Runner.scala) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.io.IOException: java.lang.IllegalArgumentException: Class is not registered: scala.reflect.ClassTag$$anon$1 Note: To register this class use: kryo.register(scala.reflect.ClassTag$$anon$1.class); at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163) at org.apache.spark.RangePartitioner.writeObject(Partitioner.scala:209) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468) at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468) at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 65 more Caused by: java.lang.IllegalArgumentException: Class is not registered: scala.reflect.ClassTag$$anon$1 Note: To register this class use: kryo.register(scala.reflect.ClassTag$$anon$1.class); at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442) at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79) at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158) at org.apache.spark.RangePartitioner$$anonfun$writeObject$1$$anonfun$apply$mcV$sp$1.apply(Partitioner.scala:220) at org.apache.spark.RangePartitioner$$anonfun$writeObject$1$$anonfun$apply$mcV$sp$1.apply(Partitioner.scala:219) at org.apache.spark.util.Utils$.serializeViaNestedStream(Utils.scala:128) at org.apache.spark.RangePartitioner$$anonfun$writeObject$1.apply$mcV$sp(Partitioner.scala:219) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160) ... 124 more Moreover, registering the following class did not eliminate the SparkException: classOf[scala.reflect.ClassTag[_]] My guess is that the IllegalArgumentException associated with scala.reflect.ClassTag$$anon$1 has nothing to do with the SparkException but I'm not certain. Note that none of the exceptions that are discussed above occur if the KryoSerializer is not enabled. Also, none of the exceptions occur if spark.kryo.registrationRequired is not set to true on line 44. So, even the SparkException, which does not complain of an unregistered class, appears to be related to the requirement for class registration that is specified on line 44. Note also that the SparkException occurs only for the sorted RDD. In addition, no complaint of an unregistered class accompanies this SparkException, other than the complaint about the scala.reflect.ClassTag$$anon$1 class that I think isn't relevant (although I'm not certain). So, I have two questions: First, why does line 59 give rise to the SparkException for the sorted RDD, and in particular, in the context of class registration that eliminated complaints about unregistered classes for the unsorted RDD? How might this SparkException be eliminated? Second, why does: classOf[Array[Tuple3[_, _, _]]]eliminate the 'java.lang.IllegalArgumentException: Class is not registered: scala.Tuple3[]' when in fact drilling down into the debugger suggests that a more thorough class registration would be: classOf[Array[Tuple3[Int, Int, Array[Long]]]]That is, why does the wildcard specification '_' suffice? And would the more thorough specification be preferred, that is, would it result in a smaller Kryo serialized result? Thanks in advance for any insight that you can provide into this problem. package kryo import context.SharedSparkContext import org.apache.spark.{SparkContext, SparkConf} import org.scalatest.FunSuite class KryoSerializerTest extends FunSuite with SharedSparkContext with Serializable { test("kryo serializer") { // Update the SparkContext to specify the KryoSerializer val sparkConf: SparkConf = sc.getConf sparkConf.set(s"spark.serializer", s"org.apache.spark.serializer.KryoSerializer") sparkConf.set(s"spark.kryo.registrationRequired", s"true") sparkConf.registerKryoClasses( Array( classOf[scala.collection.mutable.WrappedArray.ofInt], classOf[Array[Int]], classOf[Array[Tuple3[_, _, _]]] ) ) sc.stop val sparkContext = new SparkContext(sparkConf) val rdd = sparkContext.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16), 4) val rddPartitionsSizes: Array[Int] = rdd.mapPartitions(iter => Array(iter.size).iterator, true).collect rddPartitionsSizes.foreach(ps => println(ps)) val sortedRdd = rdd.sortBy(e => e, true) val sortedRddPartitionsSizes: Array[Int] = sortedRdd.mapPartitions(iter => Array(iter.size).iterator, true).collect sortedRddPartitionsSizes.foreach(ps => println(ps)) } }