Which line is line 42 in your code ? When variable lines becomes empty, you can stop your program.
Cheers > On Feb 23, 2016, at 12:25 AM, Femi Anthony <femib...@gmail.com> wrote: > > I am working on Spark Streaming API and I wish to stream a set of > pre-downloaded web log files continuously to simulate a real-time stream. I > wrote a script that gunzips the compressed logs and pipes the output to nc on > port 7777. > > The script looks like this: > > BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive > zipped_files=`find $BASEDIR -name "*.gz"` > > for zfile in $zipped_files > do > echo "Unzipping $zfile..." > gunzip -c $zfile | nc -l -p 7777 -q 20 > > done > I have streaming code written in Scala that processes the streams. It works > well for the most part, but when its run out of files to stream I get the > following error in Spark: > > 16/02/19 23:04:35 WARN ReceiverSupervisorImpl: > Restarting receiver with delay 2000 ms: Socket data stream had no more data > 16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0: > Restarting receiver with delay 2000ms: Socket data stream had no more data > 16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600 replicated > to only 0 peer(s) instead of 1 peers > .... > 16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 47) > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:313) > at scala.None$.get(Option.scala:311) > at > com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42) > at > com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42) > How to I implement a graceful shutdown so that the program exits gracefully > when it no longer detects any data in the stream ? > > My Spark Streaming code looks like this: > > > object StreamingLogEnhanced { > def main(args: Array[String]) { > val master = args(0) > val conf = new > SparkConf().setMaster(master).setAppName("StreamingLogEnhanced") > // Create a StreamingContext with a n second batch size > val ssc = new StreamingContext(conf, Seconds(10)) > // Create a DStream from all the input on port 7777 > val log = Logger.getLogger(getClass.getName) > > sys.ShutdownHookThread { > log.info("Gracefully stopping Spark Streaming Application") > ssc.stop(true, true) > log.info("Application stopped") > } > val lines = ssc.socketTextStream("localhost", 7777) > // Create a count of log hits by ip > var ipCounts=countByIp(lines) > ipCounts.print() > > // start our streaming context and wait for it to "finish" > ssc.start() > // Wait for 600 seconds then exit > ssc.awaitTermination(10000*600) > ssc.stop() > } > > def countByIp(lines: DStream[String]) = { > val parser = new AccessLogParser > val accessLogDStream = lines.map(line => parser.parseRecord(line)) > val ipDStream = accessLogDStream.map(entry => > (entry.get.clientIpAddress, 1)) > ipDStream.reduceByKey((x, y) => x + y) > } > > } > Thanks for any suggestions in advance. > >