Thanks, Tagatha.  This and your other reply on awaitTermination are very
helpful.

Diana


On Thu, Mar 27, 2014 at 4:40 PM, Tathagata Das
<tathagata.das1...@gmail.com>wrote:

> Very good questions! Responses inline.
>
> TD
>
> On Thu, Mar 27, 2014 at 8:02 AM, Diana Carroll <dcarr...@cloudera.com>
> wrote:
> > I'm working with spark streaming using spark-shell, and hoping folks
> could
> > answer a few questions I have.
> >
> > I'm doing WordCount on a socket stream:
> >
> > import org.apache.spark.streaming.StreamingContext
> > import org.apache.spark.streaming.StreamingContext._
> > import org.apache.spark.streaming.Seconds
> > var ssc = new StreamingContext(sc,Seconds(5))
> > var mystream = ssc.socketTextStream("localhost",4444)
> > var words = mystream.flatMap(line => line.split(" "))
> > var wordCounts = words.map(x => (x, 1)).reduceByKey((x,y) => x+y)
> > wordCounts.print()
> > ssc.start()
> >
> >
> >
> > 1.  I'm assuming that using spark shell is an edge case, and that spark
> > streaming is really intended mostly for batch use.  True?
> >
>
> Yes. Currently the spark-shell is not the intended execution mode for
> Spark Streaming, even though it can be done for quick testing.
>
> > 2.   I notice that once I start ssc.start(), my stream starts processing
> and
> > continues indefinitely...even if I close the socket on the server end
> (I'm
> > using unix command "nc" to mimic a server as explained in the streaming
> > programming guide .)  Can I tell my stream to detect if it's lost a
> > connection and therefore stop executing?  (Or even better, to attempt to
> > re-establish the connection?)
> >
>
>
> Currently, not yet. But I am aware of this and this behavior will be
> improved in the future.
>
> > 3.  I tried entering ssc.stop which resulted in an error:
> >
> > Exception in thread "Thread-43" org.apache.spark.SparkException: Job
> > cancelled because SparkContext was shut down
> > 14/03/27 07:36:13 ERROR ConnectionManager: Corresponding
> > SendingConnectionManagerId not found
> >
> > But it did stop the DStream execution.
> >
>
>
> Ah, that happens sometimes. The existing behavior of ssc.stop() is
> that it will stop everything immediately.
> I just opened a pull request for a more graceful shutting down of the
> Spark streaming program.
> https://github.com/apache/spark/pull/247
>
> > 4.  Then I tried restarting the ssc again (ssc.start) and got another
> error:
> > org.apache.spark.SparkException: JobScheduler already started
> > Is restarting an ssc supported?
> >
>
>
> Restarting is ideally not supported. However, the behavior was not
> explicitly checked. The above pull requests
> makes the behavior more explicitly by throwing the right warnings and
> exceptions.
>
> > 5.  When I perform an operation like wordCounts.print(), that operation
> will
> > execution on each batch, ever n seconds.  Is there a way I can undo that
> > operation?  That is, I want it to *stop* executing that print ever n
> > seconds...without having to stop the stream.
> >
> > What I'm really asking is...can I explore DStreams interactively the way
> I
> > can explore my data in regular Spark.  In regular Spark, I might perform
> > various operations on an RDD to see what happens.  So at first, I might
> have
> > used "split(" ") to tokenize my input text, but now I want to try using
> > split(",") instead, after the stream has already started running.  Can I
> do
> > that?
> >
> > I did find out that if add a new operation to an existing dstream (say,
> > words.print()) after the ssc.start it works. It *will* add the second
> > print() call to the execution list every n seconds.
> >
> > but if I try to add new dstreams, e.g.
> > ...
> >
> > ssc.start()
> >
> > var testpairs = words.map(x => (x, "TEST"))
> > testpairs.print()
> >
> >
> > I get an error:
> >
> > 14/03/27 07:57:50 ERROR JobScheduler: Error generating jobs for time
> > 1395932270000 ms
> > java.lang.Exception:
> > org.apache.spark.streaming.dstream.MappedDStream@84f0f92 has not been
> > initialized
> >
> >
> > Is this sort of interactive use just not supported?
>
>
> Modifying the DStream operations after the context has started is not
> officially supported. However dynamically changing the computation can
> be done using DStream.transform() or DStream.foreachRDD()
> Both these operations allow you to do arbitrary RDD operations on each
> RDD. So you can dynamically modify what RDD operations are used within
> the DStream transform / foreachRDD (so you are not changing the
> DStream operations, only whats inside the DStream operation). But to
> use this really interactively, you have to write a bit of additional
> code that allows the user to interactively specify the function
> applied on each RDD.
>
>
>
> >
> > Thanks!
> >
> > Diana
>

Reply via email to