At this point I am assuming that nobody has an idea... I am still going to give it a last shot just in case it was missed by some people :)
Thanks, On Mon, Apr 20, 2015 at 2:20 PM, Jean-Pascal Billaud <j...@tellapart.com> wrote: > Hey, so I start the context at the very end when all the piping is done. > BTW a foreachRDD will be called on the resulting dstream.map() right after > that. > > The puzzling thing is why removing the context bounds solve the problem... > What does this exception mean in general? > > On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das <t...@databricks.com> > wrote: > >> When are you getting this exception? After starting the context? >> >> TD >> >> On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud <j...@tellapart.com> >> wrote: >> >>> Hi, >>> >>> I am getting this serialization exception and I am not too sure what >>> "Graph is unexpectedly null when DStream is being serialized" means? >>> >>> 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status: FAILED, >>> exitCode: 15, (reason: User class threw exception: Task not serializable) >>> Exception in thread "Driver" org.apache.spark.SparkException: Task not >>> serializable >>> at org.apache.spark.util.ClosureCleaner$.ensureSerializable( >>> ClosureCleaner.scala:166) >>> at org.apache.spark.util.ClosureCleaner$.clean( >>> ClosureCleaner.scala:158) >>> at org.apache.spark.SparkContext.clean(SparkContext.scala:1435) >>> at org.apache.spark.streaming.dstream.DStream.map(DStream. >>> scala:438) >>> [...] >>> Caused by: java.io.NotSerializableException: Graph is unexpectedly null >>> when DStream is being serialized. >>> at org.apache.spark.streaming.dstream.DStream$anonfun$ >>> writeObject$1.apply$mcV$sp(DStream.scala:420) >>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala: >>> 985) >>> at org.apache.spark.streaming.dstream.DStream.writeObject( >>> DStream.scala:403) >>> >>> The operation comes down to something like this: >>> >>> dstream.map(tuple => { >>> val w = StreamState.fetch[K,W](state.prefixKey, tuple._1) >>> (tuple._1, (tuple._2, w)) }) >>> >>> And StreamState being a very simple standalone object: >>> >>> object StreamState { >>> def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String, >>> key: K) : Option[V] = None >>> } >>> >>> However if I remove the context bounds from K in fetch e.g. removing >>> ClassTag and Ordering then everything is fine. >>> >>> If anyone has some pointers, I'd really appreciate it. >>> >>> Thanks, >>> >> >> >