Hey Eno,

I think this makes sense. I do think people who spend time running
production stream processing systems will, over time, end up strongly
preferring the current behavior of failing and fixing the root problem
rather than skipping, but we don't need to force this on people as long as
the default is to fail.

One thing I'm confused about is the scope of the proposal. I think the plan
is that this would cover all exceptions that occur whether in serializers
or ANY user code? Is that right? So if I do stream.map(x =>
x.header.timestamp) and that throws a NullPointerException, this would be
triggered? If so what I understand is that what is passed in to me is the
original consumer record, not the value x that produced the null pointer
exception? Is that right? If this understanding is correct then the
name RecordExceptionHandler should maybe be something like
ProcessingExceptionHandler since the exception isn't necessarily directly
tied to an input Record, right?

A couple of other comments:

   - It's important we maintain the original stack trace when we rethrow
   the exception (probably obvious, but thought I'd mention it)
   - As a matter of style I'd advocate for making a single
   DefaultExceptionHandler which logs the error and adding configs for this to
   control when (if ever) it fails. This will allow adding additional useful
   options in a way that can be combined (such as the dead letter thing,
   retries, etc). Basically the point is that these facilities aren't
   "either/or". Also you mention adding configs for these in the existing
   proposal, it'd be good to say what the configs are.
   - I think we should hold off on retries unless we have worked out the
   full usage pattern, people can always implement their own. I think the idea
   is that you send the message to some kind of dead letter queue and then
   replay these later. This obviously destroys all semantic guarantees we are
   working hard to provide right now, which may be okay.
   - I agree that the LogAndThresholdExceptionHandler is closest to what
   most people think they want. I think making the exception handler stateful
   is probably fine since this is inherently an approximate threshold. I do
   think this is a bit more complex then it sounds though since you'll
   obviously need to compute some kind of cheap running rate. Obviously the
   two failure modes you'd need to avoid are that 1/1 failures = 100% OR
   conversely that it runs successfully for one year and then fails 100% of
   the time but that isn't caught because of the excess prior history.

-Jay


On Thu, May 25, 2017 at 2:47 AM, Eno Thereska <eno.there...@gmail.com>
wrote:

> Hi there,
>
> I’ve added a KIP on improving exception handling in streams:
> KIP-161: streams record processing exception handlers.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 161%3A+streams+record+processing+exception+handlers <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161:+streams+record+
> processing+exception+handlers>
>
> Discussion and feedback is welcome, thank you.
> Eno

Reply via email to