I have now a fair understanding of the situation after looking at javap
output. So as a reminder:

dstream.map(tuple => {
val w = StreamState.fetch[K,W](state.prefixKey, tuple._1)
(tuple._1, (tuple._2, w)) })

And StreamState being a very simple standalone object:

object StreamState {
  def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String, key:
K) : Option[V] = None
}

Basically the serialization failed because the ClassTag[K] came from the
enclosing class, in which the dstream.map() code is running e.g. :

class A[K : ClassTag](val dstream: DStream[K]) {
  [...]
  def fun =
    dstream.map(tuple => {
      val w = StreamState.fetch[K,W](state.prefixKey, tuple._1)
      (tuple._1, (tuple._2, w)) })
   }

therefore the instance of class A is being serialized and it fails when the
dstream field call writeObject() when it checks for the graph field...

The fact that graph is not set might be expected given that I have not
started the context yet...

Cheers,


On Tue, Apr 21, 2015 at 6:17 PM, Tathagata Das <t...@databricks.com> wrote:

> It is kind of unexpected, i can imagine a real scenario under which it
> should trigger. But obviously I am missing something :)
>
> TD
>
> On Tue, Apr 21, 2015 at 4:23 PM, Jean-Pascal Billaud <j...@tellapart.com>
> wrote:
>
>> Sure. But in general, I am assuming this ""Graph is unexpectedly null
>> when DStream is being serialized" must mean something. Under which
>> circumstances, such an exception would trigger?
>>
>> On Tue, Apr 21, 2015 at 4:12 PM, Tathagata Das <t...@databricks.com>
>> wrote:
>>
>>> Yeah, I am not sure what is going on. The only way to figure to take a
>>> look at the disassembled bytecodes using javap.
>>>
>>> TD
>>>
>>> On Tue, Apr 21, 2015 at 1:53 PM, Jean-Pascal Billaud <j...@tellapart.com>
>>> wrote:
>>>
>>>> At this point I am assuming that nobody has an idea... I am still going
>>>> to give it a last shot just in case it was missed by some people :)
>>>>
>>>> Thanks,
>>>>
>>>> On Mon, Apr 20, 2015 at 2:20 PM, Jean-Pascal Billaud <j...@tellapart.com>
>>>> wrote:
>>>>
>>>>> Hey, so I start the context at the very end when all the piping is
>>>>> done. BTW a foreachRDD will be called on the resulting dstream.map() right
>>>>> after that.
>>>>>
>>>>> The puzzling thing is why removing the context bounds solve the
>>>>> problem... What does this exception mean in general?
>>>>>
>>>>> On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das <t...@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> When are you getting this exception? After starting the context?
>>>>>>
>>>>>> TD
>>>>>>
>>>>>> On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud <
>>>>>> j...@tellapart.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am getting this serialization exception and I am not too sure what
>>>>>>> "Graph is unexpectedly null when DStream is being serialized" means?
>>>>>>>
>>>>>>> 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status:
>>>>>>> FAILED, exitCode: 15, (reason: User class threw exception: Task not
>>>>>>> serializable)
>>>>>>> Exception in thread "Driver" org.apache.spark.SparkException: Task
>>>>>>> not serializable
>>>>>>>         at org.apache.spark.util.ClosureCleaner$.ensureSerializable(
>>>>>>> ClosureCleaner.scala:166)
>>>>>>>         at org.apache.spark.util.ClosureCleaner$.clean(
>>>>>>> ClosureCleaner.scala:158)
>>>>>>>         at org.apache.spark.SparkContext.
>>>>>>> clean(SparkContext.scala:1435)
>>>>>>>         at org.apache.spark.streaming.dstream.DStream.map(DStream.
>>>>>>> scala:438)
>>>>>>>         [...]
>>>>>>> Caused by: java.io.NotSerializableException: Graph is unexpectedly
>>>>>>> null when DStream is being serialized.
>>>>>>>         at org.apache.spark.streaming.dstream.DStream$anonfun$
>>>>>>> writeObject$1.apply$mcV$sp(DStream.scala:420)
>>>>>>>         at org.apache.spark.util.Utils$.
>>>>>>> tryOrIOException(Utils.scala:985)
>>>>>>>         at org.apache.spark.streaming.dstream.DStream.writeObject(
>>>>>>> DStream.scala:403)
>>>>>>>
>>>>>>> The operation comes down to something like this:
>>>>>>>
>>>>>>> dstream.map(tuple => {
>>>>>>> val w = StreamState.fetch[K,W](state.prefixKey, tuple._1)
>>>>>>> (tuple._1, (tuple._2, w)) })
>>>>>>>
>>>>>>> And StreamState being a very simple standalone object:
>>>>>>>
>>>>>>> object StreamState {
>>>>>>>   def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey:
>>>>>>> String, key: K) : Option[V] = None
>>>>>>> }
>>>>>>>
>>>>>>> However if I remove the context bounds from K in fetch e.g. removing
>>>>>>> ClassTag and Ordering then everything is fine.
>>>>>>>
>>>>>>> If anyone has some pointers, I'd really appreciate it.
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to