Can you enable the java flag -Dsun.io.serialization.extendedDebugInfo=true
for driver in your driver startup-script? That should give an indication of
the sequence of object references that lead to the StremaingContext being
included in the closure.

TD


On Thu, Aug 7, 2014 at 10:23 AM, Padmanabhan, Mahesh (contractor) <
[email protected]> wrote:

> Thanks TD but unfortunately that did not work.
>
> From: Tathagata Das <[email protected]>
> Date: Thursday, August 7, 2014 at 10:55 AM
> To: Mahesh Padmanabhan <[email protected]>
> Cc: "[email protected]" <[email protected]>
> Subject: Re: Spark 1.0.1 NotSerialized exception (a bit of a head
> scratcher)
>
> It could be because of the variable "enableOpStat". Since its defined
> outside foreachRDD, referring to it inside the rdd.foreach is probably
> causing the whole streaming context being included in the closure. Scala
> funkiness. Try this, see if it works.
>
> msgCount.join(ddCount).foreachRDD((rdd: RDD[(Int, (Long, Long))]) => {
>        *val enable = enableOpStat*
>
>         rdd.foreach(item => item match {
>           case (key, (oc, dc)) => {
>             DebugLogger.log("Original event count = " + oc)
>             DebugLogger.log("Found "+(oc-dc)+" duplicate(s) in "+oc+"
> events")
>             if (*enable*) {
>               try {
>                 val statBody = Array(("batchCount", oc.toString()),
>                   ("duplicateCount", (oc-dc).toString()))
>                 OperationalStatProducer.produce(statBody)
>               } catch { case e: Exception => DebugLogger.report(e) }
>             }
>           }
>         })
>       })
>
>
>
> On Thu, Aug 7, 2014 at 9:03 AM, Padmanabhan, Mahesh (contractor) <
> [email protected]> wrote:
>
>> Hello all,
>>
>> I am not sure what is going on – I am getting a NotSerializedException
>> and initially I thought it was due to not registering one of my classes
>> with Kryo but that doesn’t seem to be the case. I am essentially
>> eliminating duplicates in a spark streaming application by using a “window”
>> to eliminate duplicates from the current batch and sending de-dup stats.
>>
>> The strange thing is that my application is not affected by this error at
>> all.
>>
>> I tried registering with Kryo like this (though it was more out of
>> desperation):
>>
>> class MyRegistrator extends KryoRegistrator {
>>   override def registerClasses(kryo: Kryo) {
>>     kryo.register(classOf[Long])
>>     kryo.register(classOf[Tuple2[Int, Tuple2[Long, Long]]])
>>   }
>> }
>>
>> Immediately before the exception, I have a piece of code that seems to be
>> executed (at rdd.foreach) though I am not sure if that is the culprit:
>>
>> Here is the code:
>>
>>     msgCount.join(ddCount).foreachRDD((rdd: RDD[(Int, (Long, Long))]) => {
>>         rdd.foreach(item => item match {
>>           case (key, (oc, dc)) => {
>>             DebugLogger.log("Original event count = " + oc)
>>             DebugLogger.log("Found "+(oc-dc)+" duplicate(s) in "+oc+"
>> events")
>>             if (enableOpStat) {
>>               try {
>>                 val statBody = Array(("batchCount", oc.toString()),
>>                   ("duplicateCount", (oc-dc).toString()))
>>                 OperationalStatProducer.produce(statBody)
>>               } catch { case e: Exception => DebugLogger.report(e) }
>>             }
>>           }
>>         })
>>       })
>>
>> Here is the exception:
>>
>> ERROR akka.actor.OneForOneStrategy -
>> org.apache.spark.streaming.StreamingContext
>> java.io.NotSerializableException:
>> org.apache.spark.streaming.StreamingContext
>>         at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>>         at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>         at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>         at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>         at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>         at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>         at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>         at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>         at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>         at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>         at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>         at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>         at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>         at
>> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
>>         at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
>>         at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>         at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>         at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>         at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>         at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>         at
>> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
>>         at
>> org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>         at
>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>>         at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>>         at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>         at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>         at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>         at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>         at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>         at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>         at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>         at
>> org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:185)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259)
>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>         at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>         at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>         at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>         at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>         at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> ------------------------------
>> This E-mail and any of its attachments may contain Time Warner Cable
>> proprietary information, which is privileged, confidential, or subject to
>> copyright belonging to Time Warner Cable. This E-mail is intended solely
>> for the use of the individual or entity to which it is addressed. If you
>> are not the intended recipient of this E-mail, you are hereby notified that
>> any dissemination, distribution, copying, or action taken in relation to
>> the contents of and attachments to this E-mail is strictly prohibited and
>> may be unlawful. If you have received this E-mail in error, please notify
>> the sender immediately and permanently delete the original and any copy of
>> this E-mail and any printout.
>>
>
>

Reply via email to