At this point I am assuming that nobody has an idea... I am still going to
give it a last shot just in case it was missed by some people :)

Thanks,

On Mon, Apr 20, 2015 at 2:20 PM, Jean-Pascal Billaud <j...@tellapart.com>
wrote:

> Hey, so I start the context at the very end when all the piping is done.
> BTW a foreachRDD will be called on the resulting dstream.map() right after
> that.
>
> The puzzling thing is why removing the context bounds solve the problem...
> What does this exception mean in general?
>
> On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das <t...@databricks.com>
> wrote:
>
>> When are you getting this exception? After starting the context?
>>
>> TD
>>
>> On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud <j...@tellapart.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I am getting this serialization exception and I am not too sure what
>>> "Graph is unexpectedly null when DStream is being serialized" means?
>>>
>>> 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status: FAILED,
>>> exitCode: 15, (reason: User class threw exception: Task not serializable)
>>> Exception in thread "Driver" org.apache.spark.SparkException: Task not
>>> serializable
>>>         at org.apache.spark.util.ClosureCleaner$.ensureSerializable(
>>> ClosureCleaner.scala:166)
>>>         at org.apache.spark.util.ClosureCleaner$.clean(
>>> ClosureCleaner.scala:158)
>>>         at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
>>>         at org.apache.spark.streaming.dstream.DStream.map(DStream.
>>> scala:438)
>>>         [...]
>>> Caused by: java.io.NotSerializableException: Graph is unexpectedly null
>>> when DStream is being serialized.
>>>         at org.apache.spark.streaming.dstream.DStream$anonfun$
>>> writeObject$1.apply$mcV$sp(DStream.scala:420)
>>>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:
>>> 985)
>>>         at org.apache.spark.streaming.dstream.DStream.writeObject(
>>> DStream.scala:403)
>>>
>>> The operation comes down to something like this:
>>>
>>> dstream.map(tuple => {
>>> val w = StreamState.fetch[K,W](state.prefixKey, tuple._1)
>>> (tuple._1, (tuple._2, w)) })
>>>
>>> And StreamState being a very simple standalone object:
>>>
>>> object StreamState {
>>>   def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String,
>>> key: K) : Option[V] = None
>>> }
>>>
>>> However if I remove the context bounds from K in fetch e.g. removing
>>> ClassTag and Ordering then everything is fine.
>>>
>>> If anyone has some pointers, I'd really appreciate it.
>>>
>>> Thanks,
>>>
>>
>>
>

Reply via email to