Hey Tobias,

I think one consideration is for checkpoint of DStream which guarantee driver 
fault tolerance.

Also this `foreachFunc` is more like an action function of RDD, thinking of 
rdd.foreach(func), in which `func` need to be serializable. So maybe I think 
your way of use it is not a normal way :).

Thanks
Jerry

From: Tobias Pfeiffer [mailto:t...@preferred.jp]
Sent: Wednesday, January 28, 2015 10:16 AM
To: user
Subject: Why must the dstream.foreachRDD(...) parameter be serializable?

Hi,

I want to do something like

dstream.foreachRDD(rdd => if (someCondition) ssc.stop())

so in particular the function does not touch any element in the RDD and runs 
completely within the driver. However, this fails with a 
NotSerializableException because $outer is not serializable etc. The DStream 
code says:

  def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
    // because the DStream is reachable from the outer object here, and because
    // DStreams can't be serialized with closures, we can't proactively check
    // it for serializability and so we pass the optional false to 
SparkContext.clean
    new ForEachDStream(this, context.sparkContext.clean(foreachFunc, 
false)).register()
  }

To be honest, I don't understand the comment. Why must that function be 
serializable even when there is no RDD action involved?

Thanks
Tobias

Reply via email to