The ScalaTest code that is enclosed at the end of this email message 
demonstrates what appears to be a bug in the KryoSerializer.  This code was 
executed from IntelliJ IDEA (community edition) under Mac OS X 10.11.2
The KryoSerializer is enabled by updating the original SparkContext  (that is 
supplied by the ScalaTest) via:
1. reading the SparkConf from the SparkContext,2. updating the SparkConf to 
enable the KryoSerializer,3. stopping the original SparkContext, and4. creating 
a new SparkContext from the updated SparkConf.
Following enabling of the KryoSerializer, execution of the following line (line 
56):val rddPartitionsSizes: Array[Int] = rdd.mapPartitions(iter => 
Array(iter.size).iterator, true).collectthrew the following three instances of 
IllegalArgumentException:
java.lang.IllegalArgumentException: Class is not registered: 
scala.collection.mutable.WrappedArray$ofIntjava.lang.IllegalArgumentException: 
Class is not registered: int[]java.lang.IllegalArgumentException: Class is not 
registered: scala.Tuple3[]

 which prompted registration of the following three classes with the 
KryoSerializer via the SparkConf.registerKryoClasses() method:        
classOf[scala.collection.mutable.WrappedArray.ofInt],
        classOf[Array[Int]],
        classOf[Array[Tuple3[_, _, _]]]
Following registration of these three classes with the KryoSerializer, the 
above-indicated 'val rddPartitionsSizes...' line (line 56) executed without 
throwing an IllegalArgumentException.
However, execution of the following line (line 59):val 
sortedRddPartitionsSizes: Array[Int] = sortedRdd.mapPartitions(iter => 
Array(iter.size).iterator, true).collect
threw the following SparkException:
Task not serializable
org.apache.spark.SparkException: Task not serializable
    at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2030)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1847)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:905)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:904)
    at 
kryo.KryoSerializerTest$$anonfun$1.apply$mcV$sp(KryoSerializerTest.scala:59)
    at kryo.KryoSerializerTest$$anonfun$1.apply(KryoSerializerTest.scala:39)
    at kryo.KryoSerializerTest$$anonfun$1.apply(KryoSerializerTest.scala:39)
    at 
org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
    at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)
    at org.scalatest.Transformer.apply(Transformer.scala:20)
    at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
    at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
    at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
    at 
org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
    at 
org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
    at 
org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
    at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
    at org.scalatest.FunSuite.runTest(FunSuite.scala:1555)
    at 
org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
    at 
org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
    at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
    at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
    at 
org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
    at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
    at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
    at org.scalatest.Suite$class.run(Suite.scala:1424)
    at 
org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
    at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
    at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
    at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
    at 
kryo.KryoSerializerTest.org$scalatest$BeforeAndAfterAll$$super$run(KryoSerializerTest.scala:37)
    at 
org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
    at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
    at kryo.KryoSerializerTest.run(KryoSerializerTest.scala:37)
    at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
    at 
org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
    at 
org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
    at 
org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
    at 
org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
    at 
org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
    at 
org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
    at org.scalatest.tools.Runner$.run(Runner.scala:883)
    at org.scalatest.tools.Runner.run(Runner.scala)
    at 
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
    at 
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.io.IOException: java.lang.IllegalArgumentException: Class is 
not registered: scala.reflect.ClassTag$$anon$1
Note: To register this class use: 
kryo.register(scala.reflect.ClassTag$$anon$1.class);
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
    at org.apache.spark.RangePartitioner.writeObject(Partitioner.scala:209)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at 
scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
    at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at 
scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
    at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
    at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
    ... 65 more
Caused by: java.lang.IllegalArgumentException: Class is not registered: 
scala.reflect.ClassTag$$anon$1
Note: To register this class use: 
kryo.register(scala.reflect.ClassTag$$anon$1.class);
    at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442)
    at 
com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
    at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565)
    at 
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
    at 
org.apache.spark.RangePartitioner$$anonfun$writeObject$1$$anonfun$apply$mcV$sp$1.apply(Partitioner.scala:220)
    at 
org.apache.spark.RangePartitioner$$anonfun$writeObject$1$$anonfun$apply$mcV$sp$1.apply(Partitioner.scala:219)
    at org.apache.spark.util.Utils$.serializeViaNestedStream(Utils.scala:128)
    at 
org.apache.spark.RangePartitioner$$anonfun$writeObject$1.apply$mcV$sp(Partitioner.scala:219)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
    ... 124 more


Moreover, registering the following class did not eliminate the SparkException: 
   classOf[scala.reflect.ClassTag[_]] My guess is that the 
IllegalArgumentException associated with scala.reflect.ClassTag$$anon$1 has 
nothing to do with the SparkException but I'm not certain.
Note that none of the exceptions that are discussed above occur if the 
KryoSerializer is not enabled.  Also, none of the exceptions occur if 
spark.kryo.registrationRequired is not set to true on line 44.  So, even the 
SparkException, which does not complain of an unregistered class, appears to be 
related to the requirement for class registration that is specified on line 44.

Note also that the SparkException occurs only for the sorted RDD.  In addition, 
no complaint of an unregistered class accompanies this SparkException, other 
than the complaint about the scala.reflect.ClassTag$$anon$1 class that I think 
isn't relevant (although I'm not certain).

So, I have two questions:
First, why does line 59 give rise to the SparkException for the sorted RDD, and 
in particular, in the context of class registration that eliminated complaints 
about unregistered classes for the unsorted RDD?  How might this SparkException 
be eliminated?

Second, why does:        classOf[Array[Tuple3[_, _, _]]]eliminate the 
'java.lang.IllegalArgumentException: Class is not registered: scala.Tuple3[]' 
when in fact drilling down into the debugger suggests that a more thorough 
class registration would be:        classOf[Array[Tuple3[Int, Int, 
Array[Long]]]]That is, why does the wildcard specification '_' suffice?  And 
would the more thorough specification be preferred, that is, would it result in 
a smaller Kryo serialized result?

Thanks in advance for any insight that you can provide into this problem.


package kryo

import context.SharedSparkContext
import org.apache.spark.{SparkContext, SparkConf}
import org.scalatest.FunSuite

class KryoSerializerTest extends FunSuite with SharedSparkContext with 
Serializable {

  test("kryo serializer") {

    // Update the SparkContext to specify the KryoSerializer
    val sparkConf: SparkConf = sc.getConf
    sparkConf.set(s"spark.serializer", 
s"org.apache.spark.serializer.KryoSerializer")
    sparkConf.set(s"spark.kryo.registrationRequired", s"true")
    sparkConf.registerKryoClasses(
      Array(
        classOf[scala.collection.mutable.WrappedArray.ofInt],
        classOf[Array[Int]],
        classOf[Array[Tuple3[_, _, _]]]
      )
    )
    sc.stop
    val sparkContext = new SparkContext(sparkConf)

    val rdd = sparkContext.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 
13, 14, 15, 16), 4)
    val rddPartitionsSizes: Array[Int] = rdd.mapPartitions(iter => 
Array(iter.size).iterator, true).collect
    rddPartitionsSizes.foreach(ps => println(ps))
    val sortedRdd = rdd.sortBy(e => e, true)
    val sortedRddPartitionsSizes: Array[Int] = sortedRdd.mapPartitions(iter => 
Array(iter.size).iterator, true).collect
    sortedRddPartitionsSizes.foreach(ps => println(ps))
  }

}

Reply via email to