LOL! Glad it solved it. TD
On Thu, Aug 7, 2014 at 2:23 PM, Padmanabhan, Mahesh (contractor) < [email protected]> wrote: > Slap my head moment – using rdd.context solved it! > > Thanks TD, > > Mahesh > > From: Tathagata Das <[email protected]> > Date: Thursday, August 7, 2014 at 3:06 PM > To: Mahesh Padmanabhan <[email protected]> > Cc: amit <[email protected]>, "[email protected]" < > [email protected]> > > Subject: Re: Spark 1.0.1 NotSerialized exception (a bit of a head > scratcher) > > Well I dont see the rdd in the foreachRDD being passed into the A.func1() > so I am not sure what is purpose of the function. Assuming that you do want > to pass on that RDD into that function, and also want to have access to the > sparkContext, you can only pass on the RDD and then access the associated > context as rdd.context. This will avoid referring of the ssc inside the > foreachRDD. > > See if that helps. > > TD > > > On Thu, Aug 7, 2014 at 12:47 PM, Padmanabhan, Mahesh (contractor) < > [email protected]> wrote: > >> Thanks TD, Amit. >> >> I think I figured out where the problem is through the process of >> commenting out individual lines of code one at a time :( >> >> Can either of you help me find the right solution? I tried creating the >> SparkContext outside the foreachRDD but that didn’t help. >> >> I have an object (let’s say A) that is passed a SparkContext like this: >> >> object A { >> def func1(sc: SparkContext) { >> //Do something with sc >> } >> >> In my main object that creates the StreamingContext, I call object A’s >> func1 method like this: >> >> val ssc = new StreamingContext(spark, Seconds(batchTime)) >> >> ssc.checkpoint(checkPointDir) >> >> val messageStream = KafkaConsumer.messageStream(ssc) >> >> messageStream.foreachRDD(rdd => { >> A.func1(ssc.sparkContext) >> } >> >> Seems like the call A.func1(ssc.sparkContext) above is the cause of the >> exception. >> >> Thanks, >> Mahesh >> >> From: Tathagata Das <[email protected]> >> Date: Thursday, August 7, 2014 at 1:11 PM >> To: amit <[email protected]> >> Cc: "[email protected]" <[email protected]> >> >> Subject: Re: Spark 1.0.1 NotSerialized exception (a bit of a head >> scratcher) >> >> From the extended info, I see that you have a function called >> createStreamingContext() in your code. Somehow that is getting referenced >> in in the foreach function. Is the whole foreachRDD code inside the >> createStreamingContext() function? Did you try marking the ssc field as >> transient? >> >> Here is a significantly different approach. Put the whole function to >> apply on each item in an object. >> >> object MyFunctions { >> def processItem(enable: Boolean)(item: (Int, (Long, Long)) = { >> val (key, (oc, dc)) = item >> DebugLogger.log("Original event count = " + oc) >> DebugLogger.log("Found "+(oc-dc)+" duplicate(s) in "+oc+" >> events") >> if (enableOpStat) { >> try { >> val statBody = Array(("batchCount", oc.toString()), >> ("duplicateCount", (oc-dc).toString())) >> OperationalStatProducer.produce(statBody) >> } catch { case e: Exception => DebugLogger.report(e) } >> } >> } >> } >> } >> >> >> >> And then use that >> >> msgCount.join(ddCount).foreachRDD((rdd: RDD[(Int, (Long, Long))]) => { >> val enable = enableOptStat >> rdd.foreach(MyFunction.processItem(enable) _ ) >> } >> >> >> >> On Thu, Aug 7, 2014 at 11:52 AM, amit <[email protected]> wrote: >> >>> There is one more configuration option called spark.closure.serializer >>> that >>> can be used to specify serializer for closures. >>> >>> Maybe in the the class you have Streaming Context as a field, so when >>> spark >>> tries to serialize the whole class it uses the spark.closure.serializer >>> to >>> serialize even the streaming context. Classes like StreamingContext may >>> not >>> work if serialized and deserialized in a different JVM(?). >>> >>> So I see two solutions one is to somehow avoid serializing >>> StreamingContext, >>> other is to override the default serialization method to serialize only >>> the >>> params required by streaming context and recreate it in the serialization >>> step from the params >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-NotSerialized-exception-a-bit-of-a-head-scratcher-tp11666p11703.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: [email protected] >>> For additional commands, e-mail: [email protected] >>> >>> >> >> ------------------------------ >> This E-mail and any of its attachments may contain Time Warner Cable >> proprietary information, which is privileged, confidential, or subject to >> copyright belonging to Time Warner Cable. This E-mail is intended solely >> for the use of the individual or entity to which it is addressed. If you >> are not the intended recipient of this E-mail, you are hereby notified that >> any dissemination, distribution, copying, or action taken in relation to >> the contents of and attachments to this E-mail is strictly prohibited and >> may be unlawful. If you have received this E-mail in error, please notify >> the sender immediately and permanently delete the original and any copy of >> this E-mail and any printout. >> > >
