I have now a fair understanding of the situation after looking at javap output. So as a reminder:
dstream.map(tuple => { val w = StreamState.fetch[K,W](state.prefixKey, tuple._1) (tuple._1, (tuple._2, w)) }) And StreamState being a very simple standalone object: object StreamState { def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String, key: K) : Option[V] = None } Basically the serialization failed because the ClassTag[K] came from the enclosing class, in which the dstream.map() code is running e.g. : class A[K : ClassTag](val dstream: DStream[K]) { [...] def fun = dstream.map(tuple => { val w = StreamState.fetch[K,W](state.prefixKey, tuple._1) (tuple._1, (tuple._2, w)) }) } therefore the instance of class A is being serialized and it fails when the dstream field call writeObject() when it checks for the graph field... The fact that graph is not set might be expected given that I have not started the context yet... Cheers, On Tue, Apr 21, 2015 at 6:17 PM, Tathagata Das <t...@databricks.com> wrote: > It is kind of unexpected, i can imagine a real scenario under which it > should trigger. But obviously I am missing something :) > > TD > > On Tue, Apr 21, 2015 at 4:23 PM, Jean-Pascal Billaud <j...@tellapart.com> > wrote: > >> Sure. But in general, I am assuming this ""Graph is unexpectedly null >> when DStream is being serialized" must mean something. Under which >> circumstances, such an exception would trigger? >> >> On Tue, Apr 21, 2015 at 4:12 PM, Tathagata Das <t...@databricks.com> >> wrote: >> >>> Yeah, I am not sure what is going on. The only way to figure to take a >>> look at the disassembled bytecodes using javap. >>> >>> TD >>> >>> On Tue, Apr 21, 2015 at 1:53 PM, Jean-Pascal Billaud <j...@tellapart.com> >>> wrote: >>> >>>> At this point I am assuming that nobody has an idea... I am still going >>>> to give it a last shot just in case it was missed by some people :) >>>> >>>> Thanks, >>>> >>>> On Mon, Apr 20, 2015 at 2:20 PM, Jean-Pascal Billaud <j...@tellapart.com> >>>> wrote: >>>> >>>>> Hey, so I start the context at the very end when all the piping is >>>>> done. BTW a foreachRDD will be called on the resulting dstream.map() right >>>>> after that. >>>>> >>>>> The puzzling thing is why removing the context bounds solve the >>>>> problem... What does this exception mean in general? >>>>> >>>>> On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das <t...@databricks.com> >>>>> wrote: >>>>> >>>>>> When are you getting this exception? After starting the context? >>>>>> >>>>>> TD >>>>>> >>>>>> On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud < >>>>>> j...@tellapart.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I am getting this serialization exception and I am not too sure what >>>>>>> "Graph is unexpectedly null when DStream is being serialized" means? >>>>>>> >>>>>>> 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status: >>>>>>> FAILED, exitCode: 15, (reason: User class threw exception: Task not >>>>>>> serializable) >>>>>>> Exception in thread "Driver" org.apache.spark.SparkException: Task >>>>>>> not serializable >>>>>>> at org.apache.spark.util.ClosureCleaner$.ensureSerializable( >>>>>>> ClosureCleaner.scala:166) >>>>>>> at org.apache.spark.util.ClosureCleaner$.clean( >>>>>>> ClosureCleaner.scala:158) >>>>>>> at org.apache.spark.SparkContext. >>>>>>> clean(SparkContext.scala:1435) >>>>>>> at org.apache.spark.streaming.dstream.DStream.map(DStream. >>>>>>> scala:438) >>>>>>> [...] >>>>>>> Caused by: java.io.NotSerializableException: Graph is unexpectedly >>>>>>> null when DStream is being serialized. >>>>>>> at org.apache.spark.streaming.dstream.DStream$anonfun$ >>>>>>> writeObject$1.apply$mcV$sp(DStream.scala:420) >>>>>>> at org.apache.spark.util.Utils$. >>>>>>> tryOrIOException(Utils.scala:985) >>>>>>> at org.apache.spark.streaming.dstream.DStream.writeObject( >>>>>>> DStream.scala:403) >>>>>>> >>>>>>> The operation comes down to something like this: >>>>>>> >>>>>>> dstream.map(tuple => { >>>>>>> val w = StreamState.fetch[K,W](state.prefixKey, tuple._1) >>>>>>> (tuple._1, (tuple._2, w)) }) >>>>>>> >>>>>>> And StreamState being a very simple standalone object: >>>>>>> >>>>>>> object StreamState { >>>>>>> def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: >>>>>>> String, key: K) : Option[V] = None >>>>>>> } >>>>>>> >>>>>>> However if I remove the context bounds from K in fetch e.g. removing >>>>>>> ClassTag and Ordering then everything is fine. >>>>>>> >>>>>>> If anyone has some pointers, I'd really appreciate it. >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >