I am working on Spark Streaming API and I wish to stream a set of
pre-downloaded web log files continuously to simulate a real-time stream. I
wrote a script that gunzips the compressed logs and pipes the output to nc
on port 7777.

The script looks like this:

BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive
zipped_files=`find $BASEDIR -name "*.gz"`

for zfile in $zipped_files
 do
  echo "Unzipping $zfile..."
  gunzip -c $zfile  | nc -l -p 7777 -q 20

 done

I have streaming code written in Scala that processes the streams. It works
well for the most part, but when its run out of files to stream I get the
following error in Spark:

16/02/19 23:04:35 WARN ReceiverSupervisorImpl:
Restarting receiver with delay 2000 ms: Socket data stream had no more data
16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0:
Restarting receiver with delay 2000ms: Socket data stream had no more data
16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600
replicated to only 0 peer(s) instead of 1 peers
....
16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 47)
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:313)
at scala.None$.get(Option.scala:311)
at 
com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
at 
com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)

How to I implement a graceful shutdown so that the program exits gracefully
when it no longer detects any data in the stream ?

My Spark Streaming code looks like this:

object StreamingLogEnhanced {
 def main(args: Array[String]) {
  val master = args(0)
  val conf = new
     SparkConf().setMaster(master).setAppName("StreamingLogEnhanced")
 // Create a StreamingContext with a n second batch size
  val ssc = new StreamingContext(conf, Seconds(10))
 // Create a DStream from all the input on port 7777
  val log = Logger.getLogger(getClass.getName)

  sys.ShutdownHookThread {
  log.info("Gracefully stopping Spark Streaming Application")
  ssc.stop(true, true)
  log.info("Application stopped")
  }
  val lines = ssc.socketTextStream("localhost", 7777)
  // Create a count of log hits by ip
  var ipCounts=countByIp(lines)
  ipCounts.print()

  // start our streaming context and wait for it to "finish"
  ssc.start()
  // Wait for 600 seconds then exit
  ssc.awaitTermination(10000*600)
  ssc.stop()
  }

 def countByIp(lines: DStream[String]) = {
   val parser = new AccessLogParser
   val accessLogDStream = lines.map(line => parser.parseRecord(line))
   val ipDStream = accessLogDStream.map(entry =>
                    (entry.get.clientIpAddress, 1))
   ipDStream.reduceByKey((x, y) => x + y)
 }

}

Thanks for any suggestions in advance.

Reply via email to