Which line is line 42 in your code ?

When variable lines becomes empty, you can stop your program. 

Cheers

> On Feb 23, 2016, at 12:25 AM, Femi Anthony <femib...@gmail.com> wrote:
> 
> I am working on Spark Streaming API and I wish to stream a set of 
> pre-downloaded web log files continuously to simulate a real-time stream. I 
> wrote a script that gunzips the compressed logs and pipes the output to nc on 
> port 7777.
> 
> The script looks like this:
> 
> BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive
> zipped_files=`find $BASEDIR -name "*.gz"`
> 
> for zfile in $zipped_files
>  do
>   echo "Unzipping $zfile..."
>   gunzip -c $zfile  | nc -l -p 7777 -q 20
> 
>  done
> I have streaming code written in Scala that processes the streams. It works 
> well for the most part, but when its run out of files to stream I get the 
> following error in Spark:
> 
> 16/02/19 23:04:35 WARN ReceiverSupervisorImpl: 
> Restarting receiver with delay 2000 ms: Socket data stream had no more data
> 16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0: 
> Restarting receiver with delay 2000ms: Socket data stream had no more data
> 16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600 replicated 
> to only 0 peer(s) instead of 1 peers
> ....
> 16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 47)
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:313)
> at scala.None$.get(Option.scala:311)
> at 
> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
> at 
> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
> How to I implement a graceful shutdown so that the program exits gracefully 
> when it no longer detects any data in the stream ?
> 
> My Spark Streaming code looks like this:
> 
> 
> object StreamingLogEnhanced {
>  def main(args: Array[String]) {
>   val master = args(0)
>   val conf = new    
>      SparkConf().setMaster(master).setAppName("StreamingLogEnhanced")
>  // Create a StreamingContext with a n second batch size
>   val ssc = new StreamingContext(conf, Seconds(10))
>  // Create a DStream from all the input on port 7777
>   val log = Logger.getLogger(getClass.getName)
> 
>   sys.ShutdownHookThread {
>   log.info("Gracefully stopping Spark Streaming Application")
>   ssc.stop(true, true)
>   log.info("Application stopped")
>   }
>   val lines = ssc.socketTextStream("localhost", 7777)
>   // Create a count of log hits by ip
>   var ipCounts=countByIp(lines)
>   ipCounts.print()
> 
>   // start our streaming context and wait for it to "finish"
>   ssc.start()
>   // Wait for 600 seconds then exit
>   ssc.awaitTermination(10000*600)
>   ssc.stop()
>   }
> 
>  def countByIp(lines: DStream[String]) = {
>    val parser = new AccessLogParser
>    val accessLogDStream = lines.map(line => parser.parseRecord(line))
>    val ipDStream = accessLogDStream.map(entry =>   
>                     (entry.get.clientIpAddress, 1))
>    ipDStream.reduceByKey((x, y) => x + y)
>  }
> 
> }
> Thanks for any suggestions in advance.
> 
> 

Reply via email to