LOL! Glad it solved it.

TD


On Thu, Aug 7, 2014 at 2:23 PM, Padmanabhan, Mahesh (contractor) <
[email protected]> wrote:

> Slap my head moment – using rdd.context solved it!
>
> Thanks TD,
>
> Mahesh
>
> From: Tathagata Das <[email protected]>
> Date: Thursday, August 7, 2014 at 3:06 PM
> To: Mahesh Padmanabhan <[email protected]>
> Cc: amit <[email protected]>, "[email protected]" <
> [email protected]>
>
> Subject: Re: Spark 1.0.1 NotSerialized exception (a bit of a head
> scratcher)
>
> Well I dont see the rdd in the foreachRDD being passed into the A.func1()
> so I am not sure what is purpose of the function. Assuming that you do want
> to pass on that RDD into that function, and also want to have access to the
> sparkContext, you can only pass on the RDD and then access the associated
> context as rdd.context. This will avoid referring of the ssc  inside the
> foreachRDD.
>
> See if that helps.
>
> TD
>
>
> On Thu, Aug 7, 2014 at 12:47 PM, Padmanabhan, Mahesh (contractor) <
> [email protected]> wrote:
>
>> Thanks TD, Amit.
>>
>> I think I figured out where the problem is through the process of
>> commenting out individual lines of code one at a time :(
>>
>> Can either of you help me find the right solution? I tried creating the
>> SparkContext outside the foreachRDD but that didn’t help.
>>
>> I have an object (let’s say A) that is passed a SparkContext like this:
>>
>> object A {
>>   def func1(sc: SparkContext) {
>>     //Do something with sc
>> }
>>
>> In my main object that creates the StreamingContext, I call object A’s
>> func1 method like this:
>>
>> val ssc = new StreamingContext(spark, Seconds(batchTime))
>>
>> ssc.checkpoint(checkPointDir)
>>
>> val messageStream = KafkaConsumer.messageStream(ssc)
>>
>> messageStream.foreachRDD(rdd => {
>>    A.func1(ssc.sparkContext)
>> }
>>
>> Seems like the call A.func1(ssc.sparkContext) above is the cause of the
>> exception.
>>
>> Thanks,
>> Mahesh
>>
>> From: Tathagata Das <[email protected]>
>> Date: Thursday, August 7, 2014 at 1:11 PM
>> To: amit <[email protected]>
>> Cc: "[email protected]" <[email protected]>
>>
>> Subject: Re: Spark 1.0.1 NotSerialized exception (a bit of a head
>> scratcher)
>>
>> From the extended info, I see that you have a function called
>> createStreamingContext() in your code. Somehow that is getting referenced
>> in in the foreach function. Is the whole foreachRDD code inside the
>> createStreamingContext() function? Did you try marking the ssc field as
>> transient?
>>
>> Here is a significantly different approach. Put the whole function to
>> apply on each item in an object.
>>
>> object MyFunctions {
>>   def processItem(enable: Boolean)(item: (Int, (Long, Long)) = {
>>      val (key, (oc, dc)) = item
>>             DebugLogger.log("Original event count = " + oc)
>>             DebugLogger.log("Found "+(oc-dc)+" duplicate(s) in "+oc+"
>> events")
>>             if (enableOpStat) {
>>               try {
>>                 val statBody = Array(("batchCount", oc.toString()),
>>                   ("duplicateCount", (oc-dc).toString()))
>>                 OperationalStatProducer.produce(statBody)
>>               } catch { case e: Exception => DebugLogger.report(e) }
>>             }
>>           }
>>   }
>> }
>>
>>
>>
>> And then use that
>>
>> msgCount.join(ddCount).foreachRDD((rdd: RDD[(Int, (Long, Long))]) => {
>>         val enable = enableOptStat
>>         rdd.foreach(MyFunction.processItem(enable) _ )
>> }
>>
>>
>>
>> On Thu, Aug 7, 2014 at 11:52 AM, amit <[email protected]> wrote:
>>
>>> There is one more configuration option called spark.closure.serializer
>>> that
>>> can be used to specify serializer for closures.
>>>
>>> Maybe in the the class you have Streaming Context as a field, so when
>>> spark
>>> tries to serialize the whole class it uses the spark.closure.serializer
>>> to
>>> serialize even the streaming context. Classes like StreamingContext may
>>> not
>>> work if serialized and deserialized in a different JVM(?).
>>>
>>> So I see two solutions one is to somehow avoid serializing
>>> StreamingContext,
>>> other is to override the default serialization method to serialize only
>>> the
>>> params required by streaming context and recreate it in the serialization
>>> step from the params
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-NotSerialized-exception-a-bit-of-a-head-scratcher-tp11666p11703.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: [email protected]
>>> For additional commands, e-mail: [email protected]
>>>
>>>
>>
>> ------------------------------
>> This E-mail and any of its attachments may contain Time Warner Cable
>> proprietary information, which is privileged, confidential, or subject to
>> copyright belonging to Time Warner Cable. This E-mail is intended solely
>> for the use of the individual or entity to which it is addressed. If you
>> are not the intended recipient of this E-mail, you are hereby notified that
>> any dissemination, distribution, copying, or action taken in relation to
>> the contents of and attachments to this E-mail is strictly prohibited and
>> may be unlawful. If you have received this E-mail in error, please notify
>> the sender immediately and permanently delete the original and any copy of
>> this E-mail and any printout.
>>
>
>

Reply via email to