I am working on Spark Streaming API and I wish to stream a set of pre-downloaded web log files continuously to simulate a real-time stream. I wrote a script that gunzips the compressed logs and pipes the output to nc on port 7777.
The script looks like this: BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive zipped_files=`find $BASEDIR -name "*.gz"` for zfile in $zipped_files do echo "Unzipping $zfile..." gunzip -c $zfile | nc -l -p 7777 -q 20 done I have streaming code written in Scala that processes the streams. It works well for the most part, but when its run out of files to stream I get the following error in Spark: 16/02/19 23:04:35 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Socket data stream had no more data 16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Socket data stream had no more data 16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600 replicated to only 0 peer(s) instead of 1 peers .... 16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 47) java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:313) at scala.None$.get(Option.scala:311) at com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42) at com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42) How to I implement a graceful shutdown so that the program exits gracefully when it no longer detects any data in the stream ? My Spark Streaming code looks like this: object StreamingLogEnhanced { def main(args: Array[String]) { val master = args(0) val conf = new SparkConf().setMaster(master).setAppName("StreamingLogEnhanced") // Create a StreamingContext with a n second batch size val ssc = new StreamingContext(conf, Seconds(10)) // Create a DStream from all the input on port 7777 val log = Logger.getLogger(getClass.getName) sys.ShutdownHookThread { log.info("Gracefully stopping Spark Streaming Application") ssc.stop(true, true) log.info("Application stopped") } val lines = ssc.socketTextStream("localhost", 7777) // Create a count of log hits by ip var ipCounts=countByIp(lines) ipCounts.print() // start our streaming context and wait for it to "finish" ssc.start() // Wait for 600 seconds then exit ssc.awaitTermination(10000*600) ssc.stop() } def countByIp(lines: DStream[String]) = { val parser = new AccessLogParser val accessLogDStream = lines.map(line => parser.parseRecord(line)) val ipDStream = accessLogDStream.map(entry => (entry.get.clientIpAddress, 1)) ipDStream.reduceByKey((x, y) => x + y) } } Thanks for any suggestions in advance.