Hello all,

I am not sure what is going on – I am getting a NotSerializedException and 
initially I thought it was due to not registering one of my classes with Kryo 
but that doesn’t seem to be the case. I am essentially eliminating duplicates 
in a spark streaming application by using a “window” to eliminate duplicates 
from the current batch and sending de-dup stats.

The strange thing is that my application is not affected by this error at all.

I tried registering with Kryo like this (though it was more out of desperation):

class MyRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[Long])
    kryo.register(classOf[Tuple2[Int, Tuple2[Long, Long]]])
  }
}

Immediately before the exception, I have a piece of code that seems to be 
executed (at rdd.foreach) though I am not sure if that is the culprit:

Here is the code:

    msgCount.join(ddCount).foreachRDD((rdd: RDD[(Int, (Long, Long))]) => {
        rdd.foreach(item => item match {
          case (key, (oc, dc)) => {
            DebugLogger.log("Original event count = " + oc)
            DebugLogger.log("Found "+(oc-dc)+" duplicate(s) in "+oc+" events")
            if (enableOpStat) {
              try {
                val statBody = Array(("batchCount", oc.toString()),
                  ("duplicateCount", (oc-dc).toString()))
                OperationalStatProducer.produce(statBody)
              } catch { case e: Exception => DebugLogger.report(e) }
            }
          }
        })
      })

Here is the exception:

ERROR akka.actor.OneForOneStrategy - org.apache.spark.streaming.StreamingContext
java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
        at 
org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
        at 
org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:185)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

________________________________
This E-mail and any of its attachments may contain Time Warner Cable 
proprietary information, which is privileged, confidential, or subject to 
copyright belonging to Time Warner Cable. This E-mail is intended solely for 
the use of the individual or entity to which it is addressed. If you are not 
the intended recipient of this E-mail, you are hereby notified that any 
dissemination, distribution, copying, or action taken in relation to the 
contents of and attachments to this E-mail is strictly prohibited and may be 
unlawful. If you have received this E-mail in error, please notify the sender 
immediately and permanently delete the original and any copy of this E-mail and 
any printout.

Reply via email to