Does this help? I can’t figure out anything new from this extra information.
Thanks,
Mahesh
2014-08-07 12:27:00,170 [spark-akka.actor.default-dispatcher-4] ERROR
akka.actor.OneForOneStrategy - org.apache.spark.streaming.StreamingContext
- field (class
"com.twc.needle.ep.EventPersister$$anonfun$createStreamingContext$1", name:
"ssc$1", type: "class org.apache.spark.streaming.StreamingContext")
- object (class
"com.twc.needle.ep.EventPersister$$anonfun$createStreamingContext$1",
<function1>)
- field (class
"org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1", name:
"foreachFunc$1", type: "interface scala.Function1")
- object (class
"org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1", <function2>)
- field (class "org.apache.spark.streaming.dstream.ForEachDStream",
name: "org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc", type:
"interface scala.Function2")
- object (class "org.apache.spark.streaming.dstream.ForEachDStream",
org.apache.spark.streaming.dstream.ForEachDStream@7d6dcf80)
- element of array (index: 1)
- array (class "[Ljava.lang.Object;", size: 16)
- field (class "scala.collection.mutable.ArrayBuffer", name: "array",
type: "class [Ljava.lang.Object;")
- object (class "scala.collection.mutable.ArrayBuffer",
ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@7129162d,
org.apache.spark.streaming.dstream.ForEachDStream@7d6dcf80))
- field (class "org.apache.spark.streaming.DStreamGraph", name:
"outputStreams", type: "class scala.collection.mutable.ArrayBuffer")
- custom writeObject data (class
"org.apache.spark.streaming.DStreamGraph")
- object (class "org.apache.spark.streaming.DStreamGraph",
org.apache.spark.streaming.DStreamGraph@23e0520a)
- field (class "org.apache.spark.streaming.Checkpoint", name: "graph",
type: "class org.apache.spark.streaming.DStreamGraph")
- root object (class "org.apache.spark.streaming.Checkpoint",
org.apache.spark.streaming.Checkpoint@73a68f0)
java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
- field (class
"com.twc.needle.ep.EventPersister$$anonfun$createStreamingContext$1", name:
"ssc$1", type: "class org.apache.spark.streaming.StreamingContext")
- object (class
"com.twc.needle.ep.EventPersister$$anonfun$createStreamingContext$1",
<function1>)
- field (class
"org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1", name:
"foreachFunc$1", type: "interface scala.Function1")
- object (class
"org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1", <function2>)
- field (class "org.apache.spark.streaming.dstream.ForEachDStream",
name: "org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc", type:
"interface scala.Function2")
- object (class "org.apache.spark.streaming.dstream.ForEachDStream",
org.apache.spark.streaming.dstream.ForEachDStream@7d6dcf80)
- element of array (index: 1)
- array (class "[Ljava.lang.Object;", size: 16)
- field (class "scala.collection.mutable.ArrayBuffer", name: "array",
type: "class [Ljava.lang.Object;")
- object (class "scala.collection.mutable.ArrayBuffer",
ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@7129162d,
org.apache.spark.streaming.dstream.ForEachDStream@7d6dcf80))
- field (class "org.apache.spark.streaming.DStreamGraph", name:
"outputStreams", type: "class scala.collection.mutable.ArrayBuffer")
- custom writeObject data (class
"org.apache.spark.streaming.DStreamGraph")
- object (class "org.apache.spark.streaming.DStreamGraph",
org.apache.spark.streaming.DStreamGraph@23e0520a)
- field (class "org.apache.spark.streaming.Checkpoint", name: "graph",
type: "class org.apache.spark.streaming.DStreamGraph")
- root object (class "org.apache.spark.streaming.Checkpoint",
org.apache.spark.streaming.Checkpoint@73a68f0)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1180)
From: Tathagata Das
<[email protected]<mailto:[email protected]>>
Date: Thursday, August 7, 2014 at 11:31 AM
To: Mahesh Padmanabhan
<[email protected]<mailto:[email protected]>>
Cc: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: Spark 1.0.1 NotSerialized exception (a bit of a head scratcher)
Can you enable the java flag -Dsun.io.serialization.extendedDebugInfo=true for
driver in your driver startup-script? That should give an indication of the
sequence of object references that lead to the StremaingContext being included
in the closure.
TD
On Thu, Aug 7, 2014 at 10:23 AM, Padmanabhan, Mahesh (contractor)
<[email protected]<mailto:[email protected]>>
wrote:
Thanks TD but unfortunately that did not work.
From: Tathagata Das
<[email protected]<mailto:[email protected]>>
Date: Thursday, August 7, 2014 at 10:55 AM
To: Mahesh Padmanabhan
<[email protected]<mailto:[email protected]>>
Cc: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: Spark 1.0.1 NotSerialized exception (a bit of a head scratcher)
It could be because of the variable "enableOpStat". Since its defined outside
foreachRDD, referring to it inside the rdd.foreach is probably causing the
whole streaming context being included in the closure. Scala funkiness. Try
this, see if it works.
msgCount.join(ddCount).foreachRDD((rdd: RDD[(Int, (Long, Long))]) => {
val enable = enableOpStat
rdd.foreach(item => item match {
case (key, (oc, dc)) => {
DebugLogger.log("Original event count = " + oc)
DebugLogger.log("Found "+(oc-dc)+" duplicate(s) in "+oc+" events")
if (enable) {
try {
val statBody = Array(("batchCount", oc.toString()),
("duplicateCount", (oc-dc).toString()))
OperationalStatProducer.produce(statBody)
} catch { case e: Exception => DebugLogger.report(e) }
}
}
})
})
On Thu, Aug 7, 2014 at 9:03 AM, Padmanabhan, Mahesh (contractor)
<[email protected]<mailto:[email protected]>>
wrote:
Hello all,
I am not sure what is going on – I am getting a NotSerializedException and
initially I thought it was due to not registering one of my classes with Kryo
but that doesn’t seem to be the case. I am essentially eliminating duplicates
in a spark streaming application by using a “window” to eliminate duplicates
from the current batch and sending de-dup stats.
The strange thing is that my application is not affected by this error at all.
I tried registering with Kryo like this (though it was more out of desperation):
class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[Long])
kryo.register(classOf[Tuple2[Int, Tuple2[Long, Long]]])
}
}
Immediately before the exception, I have a piece of code that seems to be
executed (at rdd.foreach) though I am not sure if that is the culprit:
Here is the code:
msgCount.join(ddCount).foreachRDD((rdd: RDD[(Int, (Long, Long))]) => {
rdd.foreach(item => item match {
case (key, (oc, dc)) => {
DebugLogger.log("Original event count = " + oc)
DebugLogger.log("Found "+(oc-dc)+" duplicate(s) in "+oc+" events")
if (enableOpStat) {
try {
val statBody = Array(("batchCount", oc.toString()),
("duplicateCount", (oc-dc).toString()))
OperationalStatProducer.produce(statBody)
} catch { case e: Exception => DebugLogger.report(e) }
}
}
})
})
Here is the exception:
ERROR akka.actor.OneForOneStrategy - org.apache.spark.streaming.StreamingContext
java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
at
org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:185)
at
org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259)
at
org.apache.spark.streaming.scheduler.JobGenerator.org<http://org.apache.spark.streaming.scheduler.JobGenerator.org>$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
________________________________
This E-mail and any of its attachments may contain Time Warner Cable
proprietary information, which is privileged, confidential, or subject to
copyright belonging to Time Warner Cable. This E-mail is intended solely for
the use of the individual or entity to which it is addressed. If you are not
the intended recipient of this E-mail, you are hereby notified that any
dissemination, distribution, copying, or action taken in relation to the
contents of and attachments to this E-mail is strictly prohibited and may be
unlawful. If you have received this E-mail in error, please notify the sender
immediately and permanently delete the original and any copy of this E-mail and
any printout.