hi, I used this code for graceful shutdown of my streaming app, this may not be the best way. correct me
sys.ShutdownHookThread { println("Gracefully stopping Spark Streaming Application") ssc.stop(true, true) println("Application stopped") } class StopContextThread(ssc: StreamingContext) extends Runnable { def run { ssc.stop(true, true) } } and i use this whenever i want to start graceful shutdown val thread: Thread = new Thread(new StopContextThread(ssc)) thread.start On Tue, Mar 1, 2016 at 2:32 PM, Lars Albertsson <la...@mapflat.com> wrote: > If you wait for an inactivity period before ending a test case, you > get the choice of using a long timeout, resulting in slow tests, or a > short timeout, resulting in brittle tests. Both options will make > developers waste time, and harm developer productivity. > > I suggest that you terminate the test case based on the test predicate > getting fulfilled, with a long timeout in case of test failure. I > presume that your application produces output to a channel, e.g. > database or Kafka topic, which the test oracle can inspect for test > completeness. > > The flow becomes: > > 1. Start the stream source component and output component locally, > e.g. Kafka+Cassandra > 2. Start the Spark streaming application, typically with a local master. > 3. Feed the input into the stream source, e.g. a Kafka test topic. > 4. Let the test oracle loop, polling for a condition to be met on the > output, e.g. existence of a database entry or a message on the Kafka > output topic. Sleep for a short period (10 ms) between polls, and fail > the test if the condition is not met after a long time (30 s). > 5. Terminate the streaming application. > 6. Terminate the stream source and output components. > > I have used this strategy for testing both financial transaction > systems and spark streaming applications, and it has resulted in both > fast and reliable tests, without strong coupling between production > code and test code. > > The base strategy fails if you need to test for the absence of an > output event, e.g. when my streaming event sees message X, it should > filter it and not produce output. You then need to send another input > event (poison pill pattern), and terminate on the output effects of > the poison pill event. > > If you test with multiple streaming executors, remember that there are > no order guarantees between executors; you will need to either make > sure that all executors receive poison pills, and test for the pill > effects of all of them. > > Starting the source+output components and the Spark context can be > slow. I recommend that you provide the option to reuse the test > fixture between test cases for speed. For example, if you start and > stop the fixture once for each test class, rather than once per test > method, you save a lot of time. > > Regards, > > > > Lars Albertsson > Data engineering consultant > www.mapflat.com > +46 70 7687109 > > > On Fri, Feb 26, 2016 at 8:24 AM, Mao, Wei <wei....@intel.com> wrote: > > I would argue against making it configurable unless there is real > production > > use case. If it’s just for test, there are bunch of ways to achieve it. > For > > example, you can mark if test streaming is finished globally, and stop > ssc > > on another thread when status of that mark changed. > > > > > > > > Back to original exception, blindly calling “Option.get” is always not a > > good practice. It would be better to pre-validate or use > > getOption/getOrElse. > > > > > > > > Thanks, > > > > William > > > > > > > > From: Cheng, Hao [mailto:hao.ch...@intel.com] > > Sent: Thursday, February 25, 2016 1:03 AM > > To: Daniel Siegmann; Ashutosh Kumar > > Cc: Hemant Bhanawat; Ted Yu; Femi Anthony; user > > Subject: RE: Spark Streaming - graceful shutdown when stream has no more > > data > > > > > > > > This is very interesting, how to shutdown the streaming job gracefully > once > > no input data for some time. > > > > > > > > A doable solution probably you can count the input data by using the > > Accumulator, and anther thread (in master node) will always to get the > > latest accumulator value, if there is no value change from the > accumulator > > for sometime, then shutdown the streaming job. > > > > > > > > From: Daniel Siegmann [mailto:daniel.siegm...@teamaol.com] > > Sent: Wednesday, February 24, 2016 12:30 AM > > To: Ashutosh Kumar <kmr.ashutos...@gmail.com> > > Cc: Hemant Bhanawat <hemant9...@gmail.com>; Ted Yu <yuzhih...@gmail.com > >; > > Femi Anthony <femib...@gmail.com>; user <user@spark.apache.org> > > Subject: Re: Spark Streaming - graceful shutdown when stream has no more > > data > > > > > > > > During testing you will typically be using some finite data. You want the > > stream to shut down automatically when that data has been consumed so > your > > test shuts down gracefully. > > > > Of course once the code is running in production you'll want it to keep > > waiting for new records. So whether the stream shuts down when there's no > > more data should be configurable. > > > > > > > > On Tue, Feb 23, 2016 at 11:09 AM, Ashutosh Kumar < > kmr.ashutos...@gmail.com> > > wrote: > > > > Just out of curiosity I will like to know why a streaming program should > > shutdown when no new data is arriving? I think it should keep waiting > for > > arrival of new records. > > > > Thanks > > > > Ashutosh > > > > > > > > On Tue, Feb 23, 2016 at 9:17 PM, Hemant Bhanawat <hemant9...@gmail.com> > > wrote: > > > > A guess - parseRecord is returning None in some case (probaly empty > lines). > > And then entry.get is throwing the exception. > > > > You may want to filter the None values from accessLogDStream before you > run > > the map function over it. > > > > Hemant > > > > > > Hemant Bhanawat > > > > www.snappydata.io > > > > > > > > On Tue, Feb 23, 2016 at 6:00 PM, Ted Yu <yuzhih...@gmail.com> wrote: > > > > Which line is line 42 in your code ? > > > > > > > > When variable lines becomes empty, you can stop your program. > > > > > > > > Cheers > > > > > > On Feb 23, 2016, at 12:25 AM, Femi Anthony <femib...@gmail.com> wrote: > > > > I am working on Spark Streaming API and I wish to stream a set of > > pre-downloaded web log files continuously to simulate a real-time > stream. I > > wrote a script that gunzips the compressed logs and pipes the output to > nc > > on port 7777. > > > > The script looks like this: > > > > BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive > > > > zipped_files=`find $BASEDIR -name "*.gz"` > > > > > > > > for zfile in $zipped_files > > > > do > > > > echo "Unzipping $zfile..." > > > > gunzip -c $zfile | nc -l -p 7777 -q 20 > > > > > > > > done > > > > I have streaming code written in Scala that processes the streams. It > works > > well for the most part, but when its run out of files to stream I get the > > following error in Spark: > > > > 16/02/19 23:04:35 WARN ReceiverSupervisorImpl: > > > > Restarting receiver with delay 2000 ms: Socket data stream had no more > data > > > > 16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for > stream 0: > > > > Restarting receiver with delay 2000ms: Socket data stream had no more > data > > > > 16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600 > replicated > > to only 0 peer(s) instead of 1 peers > > > > .... > > > > 16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 > (TID > > 47) > > > > java.util.NoSuchElementException: None.get > > > > at scala.None$.get(Option.scala:313) > > > > at scala.None$.get(Option.scala:311) > > > > at > > > com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42) > > > > at > > > com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42) > > > > How to I implement a graceful shutdown so that the program exits > gracefully > > when it no longer detects any data in the stream ? > > > > My Spark Streaming code looks like this: > > > > object StreamingLogEnhanced { > > > > def main(args: Array[String]) { > > > > val master = args(0) > > > > val conf = new > > > > SparkConf().setMaster(master).setAppName("StreamingLogEnhanced") > > > > // Create a StreamingContext with a n second batch size > > > > val ssc = new StreamingContext(conf, Seconds(10)) > > > > // Create a DStream from all the input on port 7777 > > > > val log = Logger.getLogger(getClass.getName) > > > > > > > > sys.ShutdownHookThread { > > > > log.info("Gracefully stopping Spark Streaming Application") > > > > ssc.stop(true, true) > > > > log.info("Application stopped") > > > > } > > > > val lines = ssc.socketTextStream("localhost", 7777) > > > > // Create a count of log hits by ip > > > > var ipCounts=countByIp(lines) > > > > ipCounts.print() > > > > > > > > // start our streaming context and wait for it to "finish" > > > > ssc.start() > > > > // Wait for 600 seconds then exit > > > > ssc.awaitTermination(10000*600) > > > > ssc.stop() > > > > } > > > > > > > > def countByIp(lines: DStream[String]) = { > > > > val parser = new AccessLogParser > > > > val accessLogDStream = lines.map(line => parser.parseRecord(line)) > > > > val ipDStream = accessLogDStream.map(entry => > > > > (entry.get.clientIpAddress, 1)) > > > > ipDStream.reduceByKey((x, y) => x + y) > > > > } > > > > > > > > } > > > > Thanks for any suggestions in advance. > > > > > > > > > > > > > > > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Thanks & Regards Sachin Aggarwal 7760502772