I'm working with spark streaming using spark-shell, and hoping folks could
answer a few questions I have.
I'm doing WordCount on a socket stream:
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.Seconds
var ssc = new StreamingContext(sc,Seconds(5))
var mystream = ssc.socketTextStream("localhost",4444)
var words = mystream.flatMap(line => line.split(" "))
var wordCounts = words.map(x => (x, 1)).reduceByKey((x,y) => x+y)
wordCounts.print()
ssc.start()
1. I'm assuming that using spark shell is an edge case, and that spark
streaming is really intended mostly for batch use. True?
2. I notice that once I start ssc.start(), my stream starts processing
and continues indefinitely...even if I close the socket on the server end
(I'm using unix command "nc" to mimic a server as explained in the
streaming programming guide .) Can I tell my stream to detect if it's lost
a connection and therefore stop executing? (Or even better, to attempt to
re-establish the connection?)
3. I tried entering ssc.stop which resulted in an error:
Exception in thread "Thread-43" org.apache.spark.SparkException: Job
cancelled because SparkContext was shut down
14/03/27 07:36:13 ERROR ConnectionManager: Corresponding
SendingConnectionManagerId not found
But it did stop the DStream execution.
4. Then I tried restarting the ssc again (ssc.start) and got another error:
org.apache.spark.SparkException: JobScheduler already started
Is restarting an ssc supported?
5. When I perform an operation like wordCounts.print(), that operation
will execution on each batch, ever n seconds. Is there a way I can undo
that operation? That is, I want it to *stop* executing that print ever n
seconds...without having to stop the stream.
What I'm really asking is...can I explore DStreams interactively the way I
can explore my data in regular Spark. In regular Spark, I might perform
various operations on an RDD to see what happens. So at first, I might
have used "split(" ") to tokenize my input text, but now I want to try
using split(",") instead, after the stream has already started running.
Can I do that?
I did find out that if add a new operation to an existing dstream (say,
words.print()) *after *the ssc.start it works. It *will* add the second
print() call to the execution list every n seconds.
but if I try to add new dstreams, e.g.
...
ssc.start()
var testpairs = words.map(x => (x, "TEST"))
testpairs.print()
I get an error:
14/03/27 07:57:50 ERROR JobScheduler: Error generating jobs for time
1395932270000 ms
java.lang.Exception:
org.apache.spark.streaming.dstream.MappedDStream@84f0f92 has not been
initialized
Is this sort of interactive use just not supported?
Thanks!
Diana