Hey Eno, I think this makes sense. I do think people who spend time running production stream processing systems will, over time, end up strongly preferring the current behavior of failing and fixing the root problem rather than skipping, but we don't need to force this on people as long as the default is to fail.
One thing I'm confused about is the scope of the proposal. I think the plan is that this would cover all exceptions that occur whether in serializers or ANY user code? Is that right? So if I do stream.map(x => x.header.timestamp) and that throws a NullPointerException, this would be triggered? If so what I understand is that what is passed in to me is the original consumer record, not the value x that produced the null pointer exception? Is that right? If this understanding is correct then the name RecordExceptionHandler should maybe be something like ProcessingExceptionHandler since the exception isn't necessarily directly tied to an input Record, right? A couple of other comments: - It's important we maintain the original stack trace when we rethrow the exception (probably obvious, but thought I'd mention it) - As a matter of style I'd advocate for making a single DefaultExceptionHandler which logs the error and adding configs for this to control when (if ever) it fails. This will allow adding additional useful options in a way that can be combined (such as the dead letter thing, retries, etc). Basically the point is that these facilities aren't "either/or". Also you mention adding configs for these in the existing proposal, it'd be good to say what the configs are. - I think we should hold off on retries unless we have worked out the full usage pattern, people can always implement their own. I think the idea is that you send the message to some kind of dead letter queue and then replay these later. This obviously destroys all semantic guarantees we are working hard to provide right now, which may be okay. - I agree that the LogAndThresholdExceptionHandler is closest to what most people think they want. I think making the exception handler stateful is probably fine since this is inherently an approximate threshold. I do think this is a bit more complex then it sounds though since you'll obviously need to compute some kind of cheap running rate. Obviously the two failure modes you'd need to avoid are that 1/1 failures = 100% OR conversely that it runs successfully for one year and then fails 100% of the time but that isn't caught because of the excess prior history. -Jay On Thu, May 25, 2017 at 2:47 AM, Eno Thereska <eno.there...@gmail.com> wrote: > Hi there, > > I’ve added a KIP on improving exception handling in streams: > KIP-161: streams record processing exception handlers. > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 161%3A+streams+record+processing+exception+handlers < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-161:+streams+record+ > processing+exception+handlers> > > Discussion and feedback is welcome, thank you. > Eno