I believe this is needed for driver recovery in Spark Streaming. If your Spark 
driver program crashes, Spark Streaming can recover the application by reading 
the set of DStreams and output operations from a checkpoint file (see 
https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing).
 But to do that, it needs to remember all the operations you're running 
periodically, including those in foreachRDD.

Matei

> On Jan 27, 2015, at 6:15 PM, Tobias Pfeiffer <t...@preferred.jp> wrote:
> 
> Hi,
> 
> I want to do something like
> 
> dstream.foreachRDD(rdd => if (someCondition) ssc.stop())
> 
> so in particular the function does not touch any element in the RDD and runs 
> completely within the driver. However, this fails with a 
> NotSerializableException because $outer is not serializable etc. The DStream 
> code says:
> 
>   def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
>     // because the DStream is reachable from the outer object here, and 
> because 
>     // DStreams can't be serialized with closures, we can't proactively check 
>     // it for serializability and so we pass the optional false to 
> SparkContext.clean
>     new ForEachDStream(this, context.sparkContext.clean(foreachFunc, 
> false)).register()
>   }
> 
> To be honest, I don't understand the comment. Why must that function be 
> serializable even when there is no RDD action involved?
> 
> Thanks
> Tobias


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to