[
https://issues.apache.org/jira/browse/SPARK-17397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15466183#comment-15466183
]
Spiro Michaylov edited comment on SPARK-17397 at 9/6/16 2:24 AM:
-----------------------------------------------------------------
Apologies: I was originally going to file it as a doc bug, and perhaps I should
have done that, although I actually suspect there's a feature enhancement to
file as well. I guess maybe the user list was a better place to hash that out.
I mostly had more mundane exceptions in mind, say throwing a custom exception
when processing a stream like this fragment from my code above.
{code}
stream
.map(x => { throw new SomeException("something"); x} )
.foreachRDD(r => println("*** count = " + r.count()))
{code}
I got the impression from the unit tests (StreamingContextSuite) that such
processing exceptions were intended to propagate through awaitTermination() and
indeed they do.
In so far as the intended usage is orderly shutdown of the StreamingContext
when the first such exception propagates, I agree it's just a doc issue, and
perhaps the intended usage pattern is something like:
{code}
try {
ssc.awaitTermination()
println("*** streaming terminated")
} catch {
case e: Exception => {
ssc.stop() // ADDED THIS AS A WAY TO GET AN ORDERLY SHUTDOWN
}
}
{code}
But it seems like to get resilient streaming applications you'd like to have
the option of seeing which exceptions have propagated, how severe, and how
many, so you'd like (at least _I_ would like) something like:
{code}
var terminated = false
var nastyExceptionCount = 0
while (!terminated && nastyExceptionCount < 100) {
try {
ssc.awaitTermination()
terminated = true
println("*** streaming terminated")
} catch {
case ne: NastyException => {
// log it and count it but don't panic
nastyExceptionCount = nastyExceptionCount + 1
}
case e: Exception => {
// yawn!
}
}
}
if (!terminated) ssc.stop() // because we got too many nasty exceptions
{code}
However, that doesn't seem to be supported (looks like the first exception gets
repeated every time awaitTermination() is called), so I'm inclined to file it
as either a bug or a feature request according to your taste, or just drop it
if you think what I'm proposing is inappropriate.
In any case, I don't want to abuse the process more than I already have: I'm
happy to either drop this, take it to the user list, or open other tickets if
desired and close this one. In fact, I notice I can still turn this into a bug
report. Should I?
was (Author: spirom):
Apologies: I was originally going to file it as a doc bug, and perhaps I should
have done that, although I actually suspect there's a feature enhancement to
file as well. I guess maybe the user list was a better place to hash that out.
I mostly had more mundane exceptions in mind, say throwing a custom exception
when processing a stream like this fragment from my code above.
{code}
stream
.map(x => { throw new SomeException("something"); x} )
.foreachRDD(r => println("*** count = " + r.count()))
{code}
I got the impression from the unit tests (StreamingContextSuite) that such
processing exceptions were intended to propagate through awaitTermination() and
indeed they do.
In so far as the intended usage is orderly shutdown of the StreamingContext
when the first such exception propagates, I agree it's just a doc issue, and
perhaps the intended usage pattern is something like:
{code}
try {
ssc.awaitTermination()
println("*** streaming terminated")
} catch {
case e: Exception => {
ssc.stop() // ADDED THIS AS A WAY TO GET AN ORDERLY SHUTDOWN
}
}
{code}
But it seems like to get resilient streaming applications you'd like to have
the option of seeing which exceptions have propagated, how severe, and how
many, so you'd like (at least _I_ would like) something like:
{code}
var terminated = false
var nastyExceptionCount = 0
while (!terminated && nastyExceptionCount < 100) {
try {
ssc.awaitTermination()
terminated = true
println("*** streaming terminated")
} catch {
case ne: NastyException => {
// log it and count it but don't panic
nastyExceptionCount = nastyExceptionCount + 1
}
case e: Exception => {
// yawn!
}
}
}
if (!terminated) ssc.stop() // because we got too many nasty exceptions
{code}
However, that doesn't seem to be supported (looks like the first exception gets
repeated every time awaitTermination() is called), so I'm inclined to file it
as either a bug or a feature request according to your taste, or just drop it
if you think what I'm proposing is inappropriate.
In any case, I don't want to abuse the process more than I already have: I'm
happy to either drop this, take it to the user list, or open other tickets if
desired and close this one.
> what to do when awaitTermination() throws?
> -------------------------------------------
>
> Key: SPARK-17397
> URL: https://issues.apache.org/jira/browse/SPARK-17397
> Project: Spark
> Issue Type: Question
> Components: Streaming
> Affects Versions: 2.0.0
> Environment: Linux, Scala but probably general
> Reporter: Spiro Michaylov
> Priority: Minor
>
> When awaitTermination propagates an exception that was thrown in processing a
> batch, the StreamingContext keeps running. Perhaps this is by design, but I
> don't see any mention of it in the API docs or the streaming programming
> guide. It's not clear what idiom should be used to block the thread until the
> context HAS been stopped in a situation where stream processing is throwing
> lots of exceptions.
> For example, in the following, streaming takes the full 30 seconds to
> terminate. My hope in asking this is to improve my own understanding and
> perhaps inspire documentation improvements. I'm not filing a bug because it's
> not clear to me whether this is working as intended.
> {code}
> val conf = new
> SparkConf().setAppName("ExceptionPropagation").setMaster("local[4]")
> val sc = new SparkContext(conf)
> // streams will produce data every second
> val ssc = new StreamingContext(sc, Seconds(1))
> val qm = new QueueMaker(sc, ssc)
> // create the stream
> val stream = // create some stream
> // register for data
> stream
> .map(x => { throw new SomeException("something"); x} )
> .foreachRDD(r => println("*** count = " + r.count()))
> // start streaming
> ssc.start()
> new Thread("Delayed Termination") {
> override def run() {
> Thread.sleep(30000)
> ssc.stop()
> }
> }.start()
> println("*** producing data")
> // start producing data
> qm.populateQueue()
> try {
> ssc.awaitTermination()
> println("*** streaming terminated")
> } catch {
> case e: Exception => {
> println("*** streaming exception caught in monitor thread")
> }
> }
> // if the above goes down the exception path, there seems no
> // good way to block here until the streaming context is stopped
> println("*** done")
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]