Thanks TD, Amit.
I think I figured out where the problem is through the process of commenting
out individual lines of code one at a time :(
Can either of you help me find the right solution? I tried creating the
SparkContext outside the foreachRDD but that didn’t help.
I have an object (let’s say A) that is passed a SparkContext like this:
object A {
def func1(sc: SparkContext) {
//Do something with sc
}
In my main object that creates the StreamingContext, I call object A’s func1
method like this:
val ssc = new StreamingContext(spark, Seconds(batchTime))
ssc.checkpoint(checkPointDir)
val messageStream = KafkaConsumer.messageStream(ssc)
messageStream.foreachRDD(rdd => {
A.func1(ssc.sparkContext)
}
Seems like the call A.func1(ssc.sparkContext) above is the cause of the
exception.
Thanks,
Mahesh
From: Tathagata Das
<[email protected]<mailto:[email protected]>>
Date: Thursday, August 7, 2014 at 1:11 PM
To: amit <[email protected]<mailto:[email protected]>>
Cc: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: Spark 1.0.1 NotSerialized exception (a bit of a head scratcher)
>From the extended info, I see that you have a function called
>createStreamingContext() in your code. Somehow that is getting referenced in
>in the foreach function. Is the whole foreachRDD code inside the
>createStreamingContext() function? Did you try marking the ssc field as
>transient?
Here is a significantly different approach. Put the whole function to apply on
each item in an object.
object MyFunctions {
def processItem(enable: Boolean)(item: (Int, (Long, Long)) = {
val (key, (oc, dc)) = item
DebugLogger.log("Original event count = " + oc)
DebugLogger.log("Found "+(oc-dc)+" duplicate(s) in "+oc+" events")
if (enableOpStat) {
try {
val statBody = Array(("batchCount", oc.toString()),
("duplicateCount", (oc-dc).toString()))
OperationalStatProducer.produce(statBody)
} catch { case e: Exception => DebugLogger.report(e) }
}
}
}
}
And then use that
msgCount.join(ddCount).foreachRDD((rdd: RDD[(Int, (Long, Long))]) => {
val enable = enableOptStat
rdd.foreach(MyFunction.processItem(enable) _ )
}
On Thu, Aug 7, 2014 at 11:52 AM, amit
<[email protected]<mailto:[email protected]>> wrote:
There is one more configuration option called spark.closure.serializer that
can be used to specify serializer for closures.
Maybe in the the class you have Streaming Context as a field, so when spark
tries to serialize the whole class it uses the spark.closure.serializer to
serialize even the streaming context. Classes like StreamingContext may not
work if serialized and deserialized in a different JVM(?).
So I see two solutions one is to somehow avoid serializing StreamingContext,
other is to override the default serialization method to serialize only the
params required by streaming context and recreate it in the serialization
step from the params
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-NotSerialized-exception-a-bit-of-a-head-scratcher-tp11666p11703.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail:
[email protected]<mailto:[email protected]>
For additional commands, e-mail:
[email protected]<mailto:[email protected]>
________________________________
This E-mail and any of its attachments may contain Time Warner Cable
proprietary information, which is privileged, confidential, or subject to
copyright belonging to Time Warner Cable. This E-mail is intended solely for
the use of the individual or entity to which it is addressed. If you are not
the intended recipient of this E-mail, you are hereby notified that any
dissemination, distribution, copying, or action taken in relation to the
contents of and attachments to this E-mail is strictly prohibited and may be
unlawful. If you have received this E-mail in error, please notify the sender
immediately and permanently delete the original and any copy of this E-mail and
any printout.