Just out of curiosity I will like to know why a streaming program should
shutdown when no new data is arriving?  I think it should keep waiting for
arrival of new records.

Thanks
Ashutosh

On Tue, Feb 23, 2016 at 9:17 PM, Hemant Bhanawat <hemant9...@gmail.com>
wrote:

> A guess - parseRecord is returning None in some case (probaly empty
> lines). And then entry.get is throwing the exception.
>
> You may want to filter the None values from accessLogDStream before you
> run the map function over it.
>
> Hemant
>
> Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
> www.snappydata.io
>
> On Tue, Feb 23, 2016 at 6:00 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Which line is line 42 in your code ?
>>
>> When variable lines becomes empty, you can stop your program.
>>
>> Cheers
>>
>> On Feb 23, 2016, at 12:25 AM, Femi Anthony <femib...@gmail.com> wrote:
>>
>> I am working on Spark Streaming API and I wish to stream a set of
>> pre-downloaded web log files continuously to simulate a real-time stream. I
>> wrote a script that gunzips the compressed logs and pipes the output to nc
>> on port 7777.
>>
>> The script looks like this:
>>
>> BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive
>> zipped_files=`find $BASEDIR -name "*.gz"`
>>
>> for zfile in $zipped_files
>>  do
>>   echo "Unzipping $zfile..."
>>   gunzip -c $zfile  | nc -l -p 7777 -q 20
>>
>>  done
>>
>> I have streaming code written in Scala that processes the streams. It
>> works well for the most part, but when its run out of files to stream I get
>> the following error in Spark:
>>
>> 16/02/19 23:04:35 WARN ReceiverSupervisorImpl:
>> Restarting receiver with delay 2000 ms: Socket data stream had no more data
>> 16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0:
>> Restarting receiver with delay 2000ms: Socket data stream had no more data
>> 16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600 replicated 
>> to only 0 peer(s) instead of 1 peers
>> ....
>> 16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 
>> 47)
>> java.util.NoSuchElementException: None.get
>> at scala.None$.get(Option.scala:313)
>> at scala.None$.get(Option.scala:311)
>> at 
>> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
>> at 
>> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
>>
>> How to I implement a graceful shutdown so that the program exits
>> gracefully when it no longer detects any data in the stream ?
>>
>> My Spark Streaming code looks like this:
>>
>> object StreamingLogEnhanced {
>>  def main(args: Array[String]) {
>>   val master = args(0)
>>   val conf = new
>>      SparkConf().setMaster(master).setAppName("StreamingLogEnhanced")
>>  // Create a StreamingContext with a n second batch size
>>   val ssc = new StreamingContext(conf, Seconds(10))
>>  // Create a DStream from all the input on port 7777
>>   val log = Logger.getLogger(getClass.getName)
>>
>>   sys.ShutdownHookThread {
>>   log.info("Gracefully stopping Spark Streaming Application")
>>   ssc.stop(true, true)
>>   log.info("Application stopped")
>>   }
>>   val lines = ssc.socketTextStream("localhost", 7777)
>>   // Create a count of log hits by ip
>>   var ipCounts=countByIp(lines)
>>   ipCounts.print()
>>
>>   // start our streaming context and wait for it to "finish"
>>   ssc.start()
>>   // Wait for 600 seconds then exit
>>   ssc.awaitTermination(10000*600)
>>   ssc.stop()
>>   }
>>
>>  def countByIp(lines: DStream[String]) = {
>>    val parser = new AccessLogParser
>>    val accessLogDStream = lines.map(line => parser.parseRecord(line))
>>    val ipDStream = accessLogDStream.map(entry =>
>>                     (entry.get.clientIpAddress, 1))
>>    ipDStream.reduceByKey((x, y) => x + y)
>>  }
>>
>> }
>>
>> Thanks for any suggestions in advance.
>>
>>
>

Reply via email to