I'm working with spark streaming using spark-shell, and hoping folks could
answer a few questions I have.

I'm doing WordCount on a socket stream:

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.Seconds
var ssc = new StreamingContext(sc,Seconds(5))
var mystream = ssc.socketTextStream("localhost",4444)
var words = mystream.flatMap(line => line.split(" "))
var wordCounts = words.map(x => (x, 1)).reduceByKey((x,y) => x+y)
wordCounts.print()
ssc.start()



1.  I'm assuming that using spark shell is an edge case, and that spark
streaming is really intended mostly for batch use.  True?

2.   I notice that once I start ssc.start(), my stream starts processing
and continues indefinitely...even if I close the socket on the server end
(I'm using unix command "nc" to mimic a server as explained in the
streaming programming guide .)  Can I tell my stream to detect if it's lost
a connection and therefore stop executing?  (Or even better, to attempt to
re-establish the connection?)

3.  I tried entering ssc.stop which resulted in an error:

Exception in thread "Thread-43" org.apache.spark.SparkException: Job
cancelled because SparkContext was shut down
14/03/27 07:36:13 ERROR ConnectionManager: Corresponding
SendingConnectionManagerId not found

But it did stop the DStream execution.

4.  Then I tried restarting the ssc again (ssc.start) and got another error:
org.apache.spark.SparkException: JobScheduler already started
Is restarting an ssc supported?

5.  When I perform an operation like wordCounts.print(), that operation
will execution on each batch, ever n seconds.  Is there a way I can undo
that operation?  That is, I want it to *stop* executing that print ever n
seconds...without having to stop the stream.

What I'm really asking is...can I explore DStreams interactively the way I
can explore my data in regular Spark.  In regular Spark, I might perform
various operations on an RDD to see what happens.  So at first, I might
have used "split(" ") to tokenize my input text, but now I want to try
using split(",") instead, after the stream has already started running.
 Can I do that?

I did find out that if add a new operation to an existing dstream (say,
words.print()) *after *the ssc.start it works. It *will* add the second
print() call to the execution list every n seconds.

but if I try to add new dstreams, e.g.
...

ssc.start()

var testpairs = words.map(x => (x, "TEST"))
testpairs.print()


I get an error:

14/03/27 07:57:50 ERROR JobScheduler: Error generating jobs for time
1395932270000 ms
java.lang.Exception:
org.apache.spark.streaming.dstream.MappedDStream@84f0f92 has not been
initialized


Is this sort of interactive use just not supported?

Thanks!

Diana

Reply via email to