Thanks, Tagatha. This and your other reply on awaitTermination are very helpful.
Diana On Thu, Mar 27, 2014 at 4:40 PM, Tathagata Das <tathagata.das1...@gmail.com>wrote: > Very good questions! Responses inline. > > TD > > On Thu, Mar 27, 2014 at 8:02 AM, Diana Carroll <dcarr...@cloudera.com> > wrote: > > I'm working with spark streaming using spark-shell, and hoping folks > could > > answer a few questions I have. > > > > I'm doing WordCount on a socket stream: > > > > import org.apache.spark.streaming.StreamingContext > > import org.apache.spark.streaming.StreamingContext._ > > import org.apache.spark.streaming.Seconds > > var ssc = new StreamingContext(sc,Seconds(5)) > > var mystream = ssc.socketTextStream("localhost",4444) > > var words = mystream.flatMap(line => line.split(" ")) > > var wordCounts = words.map(x => (x, 1)).reduceByKey((x,y) => x+y) > > wordCounts.print() > > ssc.start() > > > > > > > > 1. I'm assuming that using spark shell is an edge case, and that spark > > streaming is really intended mostly for batch use. True? > > > > Yes. Currently the spark-shell is not the intended execution mode for > Spark Streaming, even though it can be done for quick testing. > > > 2. I notice that once I start ssc.start(), my stream starts processing > and > > continues indefinitely...even if I close the socket on the server end > (I'm > > using unix command "nc" to mimic a server as explained in the streaming > > programming guide .) Can I tell my stream to detect if it's lost a > > connection and therefore stop executing? (Or even better, to attempt to > > re-establish the connection?) > > > > > Currently, not yet. But I am aware of this and this behavior will be > improved in the future. > > > 3. I tried entering ssc.stop which resulted in an error: > > > > Exception in thread "Thread-43" org.apache.spark.SparkException: Job > > cancelled because SparkContext was shut down > > 14/03/27 07:36:13 ERROR ConnectionManager: Corresponding > > SendingConnectionManagerId not found > > > > But it did stop the DStream execution. > > > > > Ah, that happens sometimes. The existing behavior of ssc.stop() is > that it will stop everything immediately. > I just opened a pull request for a more graceful shutting down of the > Spark streaming program. > https://github.com/apache/spark/pull/247 > > > 4. Then I tried restarting the ssc again (ssc.start) and got another > error: > > org.apache.spark.SparkException: JobScheduler already started > > Is restarting an ssc supported? > > > > > Restarting is ideally not supported. However, the behavior was not > explicitly checked. The above pull requests > makes the behavior more explicitly by throwing the right warnings and > exceptions. > > > 5. When I perform an operation like wordCounts.print(), that operation > will > > execution on each batch, ever n seconds. Is there a way I can undo that > > operation? That is, I want it to *stop* executing that print ever n > > seconds...without having to stop the stream. > > > > What I'm really asking is...can I explore DStreams interactively the way > I > > can explore my data in regular Spark. In regular Spark, I might perform > > various operations on an RDD to see what happens. So at first, I might > have > > used "split(" ") to tokenize my input text, but now I want to try using > > split(",") instead, after the stream has already started running. Can I > do > > that? > > > > I did find out that if add a new operation to an existing dstream (say, > > words.print()) after the ssc.start it works. It *will* add the second > > print() call to the execution list every n seconds. > > > > but if I try to add new dstreams, e.g. > > ... > > > > ssc.start() > > > > var testpairs = words.map(x => (x, "TEST")) > > testpairs.print() > > > > > > I get an error: > > > > 14/03/27 07:57:50 ERROR JobScheduler: Error generating jobs for time > > 1395932270000 ms > > java.lang.Exception: > > org.apache.spark.streaming.dstream.MappedDStream@84f0f92 has not been > > initialized > > > > > > Is this sort of interactive use just not supported? > > > Modifying the DStream operations after the context has started is not > officially supported. However dynamically changing the computation can > be done using DStream.transform() or DStream.foreachRDD() > Both these operations allow you to do arbitrary RDD operations on each > RDD. So you can dynamically modify what RDD operations are used within > the DStream transform / foreachRDD (so you are not changing the > DStream operations, only whats inside the DStream operation). But to > use this really interactively, you have to write a bit of additional > code that allows the user to interactively specify the function > applied on each RDD. > > > > > > > Thanks! > > > > Diana >