hi,

I used this code for graceful shutdown of my streaming app, this may not be
the best way. correct me

sys.ShutdownHookThread {
  println("Gracefully stopping Spark Streaming Application")
  ssc.stop(true, true)
  println("Application stopped")
}

class StopContextThread(ssc: StreamingContext) extends Runnable {
  def run {
    ssc.stop(true, true)
  }
}

and i use this whenever i want to start graceful shutdown

val thread: Thread = new Thread(new StopContextThread(ssc))

thread.start


On Tue, Mar 1, 2016 at 2:32 PM, Lars Albertsson <la...@mapflat.com> wrote:

> If you wait for an inactivity period before ending a test case, you
> get the choice of using a long timeout, resulting in slow tests, or a
> short timeout, resulting in brittle tests. Both options will make
> developers waste time, and harm developer productivity.
>
> I suggest that you terminate the test case based on the test predicate
> getting fulfilled, with a long timeout in case of test failure. I
> presume that your application produces output to a channel, e.g.
> database or Kafka topic, which the test oracle can inspect for test
> completeness.
>
> The flow becomes:
>
> 1. Start the stream source component and output component locally,
> e.g. Kafka+Cassandra
> 2. Start the Spark streaming application, typically with a local master.
> 3. Feed the input into the stream source, e.g. a Kafka test topic.
> 4. Let the test oracle loop, polling for a condition to be met on the
> output, e.g. existence of a database entry or a message on the Kafka
> output topic. Sleep for a short period (10 ms) between polls, and fail
> the test if the condition is not met after a long time (30 s).
> 5. Terminate the streaming application.
> 6. Terminate the stream source and output components.
>
> I have used this strategy for testing both financial transaction
> systems and spark streaming applications, and it has resulted in both
> fast and reliable tests, without strong coupling between production
> code and test code.
>
> The base strategy fails if you need to test for the absence of an
> output event, e.g. when my streaming event sees message X, it should
> filter it and not produce output. You then need to send another input
> event (poison pill pattern), and terminate on the output effects of
> the poison pill event.
>
> If you test with multiple streaming executors, remember that there are
> no order guarantees between executors; you will need to either make
> sure that all executors receive poison pills, and test for the pill
> effects of all of them.
>
> Starting the source+output components and the Spark context can be
> slow. I recommend that you provide the option to reuse the test
> fixture between test cases for speed. For example, if you start and
> stop the fixture once for each test class, rather than once per test
> method, you save a lot of time.
>
> Regards,
>
>
>
> Lars Albertsson
> Data engineering consultant
> www.mapflat.com
> +46 70 7687109
>
>
> On Fri, Feb 26, 2016 at 8:24 AM, Mao, Wei <wei....@intel.com> wrote:
> > I would argue against making it configurable unless there is real
> production
> > use case. If it’s just for test, there are bunch of ways to achieve it.
> For
> > example, you can mark if test streaming is finished globally, and stop
> ssc
> > on another thread when status of that mark changed.
> >
> >
> >
> > Back to  original exception, blindly calling “Option.get” is always not a
> > good practice. It would be better to pre-validate or use
> > getOption/getOrElse.
> >
> >
> >
> > Thanks,
> >
> > William
> >
> >
> >
> > From: Cheng, Hao [mailto:hao.ch...@intel.com]
> > Sent: Thursday, February 25, 2016 1:03 AM
> > To: Daniel Siegmann; Ashutosh Kumar
> > Cc: Hemant Bhanawat; Ted Yu; Femi Anthony; user
> > Subject: RE: Spark Streaming - graceful shutdown when stream has no more
> > data
> >
> >
> >
> > This is very interesting, how to shutdown the streaming job gracefully
> once
> > no input data for some time.
> >
> >
> >
> > A doable solution probably you can count the input data by using the
> > Accumulator, and anther thread (in master node) will always to get the
> > latest accumulator value, if there is no value change from the
> accumulator
> > for sometime, then shutdown the streaming job.
> >
> >
> >
> > From: Daniel Siegmann [mailto:daniel.siegm...@teamaol.com]
> > Sent: Wednesday, February 24, 2016 12:30 AM
> > To: Ashutosh Kumar <kmr.ashutos...@gmail.com>
> > Cc: Hemant Bhanawat <hemant9...@gmail.com>; Ted Yu <yuzhih...@gmail.com
> >;
> > Femi Anthony <femib...@gmail.com>; user <user@spark.apache.org>
> > Subject: Re: Spark Streaming - graceful shutdown when stream has no more
> > data
> >
> >
> >
> > During testing you will typically be using some finite data. You want the
> > stream to shut down automatically when that data has been consumed so
> your
> > test shuts down gracefully.
> >
> > Of course once the code is running in production you'll want it to keep
> > waiting for new records. So whether the stream shuts down when there's no
> > more data should be configurable.
> >
> >
> >
> > On Tue, Feb 23, 2016 at 11:09 AM, Ashutosh Kumar <
> kmr.ashutos...@gmail.com>
> > wrote:
> >
> > Just out of curiosity I will like to know why a streaming program should
> > shutdown when no new data is arriving?  I think it should keep waiting
> for
> > arrival of new records.
> >
> > Thanks
> >
> > Ashutosh
> >
> >
> >
> > On Tue, Feb 23, 2016 at 9:17 PM, Hemant Bhanawat <hemant9...@gmail.com>
> > wrote:
> >
> > A guess - parseRecord is returning None in some case (probaly empty
> lines).
> > And then entry.get is throwing the exception.
> >
> > You may want to filter the None values from accessLogDStream before you
> run
> > the map function over it.
> >
> > Hemant
> >
> >
> > Hemant Bhanawat
> >
> > www.snappydata.io
> >
> >
> >
> > On Tue, Feb 23, 2016 at 6:00 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> >
> > Which line is line 42 in your code ?
> >
> >
> >
> > When variable lines becomes empty, you can stop your program.
> >
> >
> >
> > Cheers
> >
> >
> > On Feb 23, 2016, at 12:25 AM, Femi Anthony <femib...@gmail.com> wrote:
> >
> > I am working on Spark Streaming API and I wish to stream a set of
> > pre-downloaded web log files continuously to simulate a real-time
> stream. I
> > wrote a script that gunzips the compressed logs and pipes the output to
> nc
> > on port 7777.
> >
> > The script looks like this:
> >
> > BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive
> >
> > zipped_files=`find $BASEDIR -name "*.gz"`
> >
> >
> >
> > for zfile in $zipped_files
> >
> >  do
> >
> >   echo "Unzipping $zfile..."
> >
> >   gunzip -c $zfile  | nc -l -p 7777 -q 20
> >
> >
> >
> >  done
> >
> > I have streaming code written in Scala that processes the streams. It
> works
> > well for the most part, but when its run out of files to stream I get the
> > following error in Spark:
> >
> > 16/02/19 23:04:35 WARN ReceiverSupervisorImpl:
> >
> > Restarting receiver with delay 2000 ms: Socket data stream had no more
> data
> >
> > 16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for
> stream 0:
> >
> > Restarting receiver with delay 2000ms: Socket data stream had no more
> data
> >
> > 16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600
> replicated
> > to only 0 peer(s) instead of 1 peers
> >
> > ....
> >
> > 16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0
> (TID
> > 47)
> >
> > java.util.NoSuchElementException: None.get
> >
> > at scala.None$.get(Option.scala:313)
> >
> > at scala.None$.get(Option.scala:311)
> >
> > at
> >
> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
> >
> > at
> >
> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
> >
> > How to I implement a graceful shutdown so that the program exits
> gracefully
> > when it no longer detects any data in the stream ?
> >
> > My Spark Streaming code looks like this:
> >
> > object StreamingLogEnhanced {
> >
> >  def main(args: Array[String]) {
> >
> >   val master = args(0)
> >
> >   val conf = new
> >
> >      SparkConf().setMaster(master).setAppName("StreamingLogEnhanced")
> >
> >  // Create a StreamingContext with a n second batch size
> >
> >   val ssc = new StreamingContext(conf, Seconds(10))
> >
> >  // Create a DStream from all the input on port 7777
> >
> >   val log = Logger.getLogger(getClass.getName)
> >
> >
> >
> >   sys.ShutdownHookThread {
> >
> >   log.info("Gracefully stopping Spark Streaming Application")
> >
> >   ssc.stop(true, true)
> >
> >   log.info("Application stopped")
> >
> >   }
> >
> >   val lines = ssc.socketTextStream("localhost", 7777)
> >
> >   // Create a count of log hits by ip
> >
> >   var ipCounts=countByIp(lines)
> >
> >   ipCounts.print()
> >
> >
> >
> >   // start our streaming context and wait for it to "finish"
> >
> >   ssc.start()
> >
> >   // Wait for 600 seconds then exit
> >
> >   ssc.awaitTermination(10000*600)
> >
> >   ssc.stop()
> >
> >   }
> >
> >
> >
> >  def countByIp(lines: DStream[String]) = {
> >
> >    val parser = new AccessLogParser
> >
> >    val accessLogDStream = lines.map(line => parser.parseRecord(line))
> >
> >    val ipDStream = accessLogDStream.map(entry =>
> >
> >                     (entry.get.clientIpAddress, 1))
> >
> >    ipDStream.reduceByKey((x, y) => x + y)
> >
> >  }
> >
> >
> >
> > }
> >
> > Thanks for any suggestions in advance.
> >
> >
> >
> >
> >
> >
> >
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

Thanks & Regards

Sachin Aggarwal
7760502772

Reply via email to