Can you enable the java flag -Dsun.io.serialization.extendedDebugInfo=true for driver in your driver startup-script? That should give an indication of the sequence of object references that lead to the StremaingContext being included in the closure.
TD On Thu, Aug 7, 2014 at 10:23 AM, Padmanabhan, Mahesh (contractor) < [email protected]> wrote: > Thanks TD but unfortunately that did not work. > > From: Tathagata Das <[email protected]> > Date: Thursday, August 7, 2014 at 10:55 AM > To: Mahesh Padmanabhan <[email protected]> > Cc: "[email protected]" <[email protected]> > Subject: Re: Spark 1.0.1 NotSerialized exception (a bit of a head > scratcher) > > It could be because of the variable "enableOpStat". Since its defined > outside foreachRDD, referring to it inside the rdd.foreach is probably > causing the whole streaming context being included in the closure. Scala > funkiness. Try this, see if it works. > > msgCount.join(ddCount).foreachRDD((rdd: RDD[(Int, (Long, Long))]) => { > *val enable = enableOpStat* > > rdd.foreach(item => item match { > case (key, (oc, dc)) => { > DebugLogger.log("Original event count = " + oc) > DebugLogger.log("Found "+(oc-dc)+" duplicate(s) in "+oc+" > events") > if (*enable*) { > try { > val statBody = Array(("batchCount", oc.toString()), > ("duplicateCount", (oc-dc).toString())) > OperationalStatProducer.produce(statBody) > } catch { case e: Exception => DebugLogger.report(e) } > } > } > }) > }) > > > > On Thu, Aug 7, 2014 at 9:03 AM, Padmanabhan, Mahesh (contractor) < > [email protected]> wrote: > >> Hello all, >> >> I am not sure what is going on – I am getting a NotSerializedException >> and initially I thought it was due to not registering one of my classes >> with Kryo but that doesn’t seem to be the case. I am essentially >> eliminating duplicates in a spark streaming application by using a “window” >> to eliminate duplicates from the current batch and sending de-dup stats. >> >> The strange thing is that my application is not affected by this error at >> all. >> >> I tried registering with Kryo like this (though it was more out of >> desperation): >> >> class MyRegistrator extends KryoRegistrator { >> override def registerClasses(kryo: Kryo) { >> kryo.register(classOf[Long]) >> kryo.register(classOf[Tuple2[Int, Tuple2[Long, Long]]]) >> } >> } >> >> Immediately before the exception, I have a piece of code that seems to be >> executed (at rdd.foreach) though I am not sure if that is the culprit: >> >> Here is the code: >> >> msgCount.join(ddCount).foreachRDD((rdd: RDD[(Int, (Long, Long))]) => { >> rdd.foreach(item => item match { >> case (key, (oc, dc)) => { >> DebugLogger.log("Original event count = " + oc) >> DebugLogger.log("Found "+(oc-dc)+" duplicate(s) in "+oc+" >> events") >> if (enableOpStat) { >> try { >> val statBody = Array(("batchCount", oc.toString()), >> ("duplicateCount", (oc-dc).toString())) >> OperationalStatProducer.produce(statBody) >> } catch { case e: Exception => DebugLogger.report(e) } >> } >> } >> }) >> }) >> >> Here is the exception: >> >> ERROR akka.actor.OneForOneStrategy - >> org.apache.spark.streaming.StreamingContext >> java.io.NotSerializableException: >> org.apache.spark.streaming.StreamingContext >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >> at >> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >> at >> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) >> at >> org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at >> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >> at >> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) >> at >> org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:185) >> at >> org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259) >> at org.apache.spark.streaming.scheduler.JobGenerator.org >> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167) >> at >> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >> at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> ------------------------------ >> This E-mail and any of its attachments may contain Time Warner Cable >> proprietary information, which is privileged, confidential, or subject to >> copyright belonging to Time Warner Cable. This E-mail is intended solely >> for the use of the individual or entity to which it is addressed. If you >> are not the intended recipient of this E-mail, you are hereby notified that >> any dissemination, distribution, copying, or action taken in relation to >> the contents of and attachments to this E-mail is strictly prohibited and >> may be unlawful. If you have received this E-mail in error, please notify >> the sender immediately and permanently delete the original and any copy of >> this E-mail and any printout. >> > >
