I've opened this pull request to implement the KIP as currently written: https://github.com/apache/kafka/pull/4165. It still needs some tests added, but largely represents the shape I was going for.
If there are more points that folks would like to discuss, please let me know. If I don't hear anything by tomorrow afternoon I'll probably start a [VOTE] thread. Thanks, Matt On Fri, Oct 27, 2017 at 7:33 PM Matt Farmer <m...@frmr.me> wrote: > I can’t think of a reason that would be problematic. > > Most of the time I would write a handler like this, I either want to > ignore the error or fail and bring everything down so that I can spin it > back up later and resume from earlier offsets. When we start up after > crashing we’ll eventually try to process the message we failed to produce > again. > > I’m concerned that “putting in a queue for later” opens you up to putting > messages into the destination topic in an unexpected order. However if > others feel differently, I’m happy to talk about it. > > On Fri, Oct 27, 2017 at 7:10 PM Guozhang Wang <wangg...@gmail.com> wrote: > >> > Please correct me if I'm wrong, but my understanding is that the record >> > metadata is always null if an exception occurred while trying to >> produce. >> >> That is right. Thanks. >> >> I looked at the example code, and one thing I realized that since we are >> not passing the context in the handle function, we may not be implement >> the >> logic to send the fail records into another queue for future processing. >> Would people think that would be a big issue? >> >> >> Guozhang >> >> >> On Thu, Oct 26, 2017 at 12:14 PM, Matt Farmer <m...@frmr.me> wrote: >> >> > Hello all, >> > >> > I've updated the KIP based on this conversation, and made it so that its >> > interface, config setting, and parameters line up more closely with the >> > interface in KIP-161 (deserialization handler). >> > >> > I believe there are a few specific questions I need to reply to..... >> > >> > > The question I had about then handle parameters are around the record, >> > > should it be `ProducerRecord<byte[], byte[]>`, or be generics of >> > > `ProducerRecord<? extends K, ? extends V>` or `ProducerRecord<? >> extends >> > > Object, ? extends Object>`? >> > >> > At this point in the code we're guaranteed that this is a >> > ProducerRecord<byte[], byte[]>, so the generics would just make it >> harder >> > to work with the key and value. >> > >> > > Also, should the handle function include the `RecordMetadata` as well >> in >> > > case it is not null? >> > >> > Please correct me if I'm wrong, but my understanding is that the record >> > metadata is always null if an exception occurred while trying to >> produce. >> > >> > > We may probably try to write down at least the following handling >> logic >> > and >> > > see if the given API is sufficient for it >> > >> > I've added some examples to the KIP. Let me know what you think. >> > >> > Cheers, >> > Matt >> > >> > On Mon, Oct 23, 2017 at 9:00 PM Matt Farmer <m...@frmr.me> wrote: >> > >> > > Thanks for this feedback. I’m at a conference right now and am >> planning >> > on >> > > updating the KIP again with details from this conversation later this >> > week. >> > > >> > > I’ll shoot you a more detailed response then! :) >> > > On Mon, Oct 23, 2017 at 8:16 PM Guozhang Wang <wangg...@gmail.com> >> > wrote: >> > > >> > >> Thanks for the KIP Matt. >> > >> >> > >> Regarding the handle interface of ProductionExceptionHandlerResponse, >> > >> could >> > >> you write it on the wiki also, along with the actual added config >> names >> > >> (e.g. what >> > >> >> > >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+ >> > deserialization+exception+handlers >> > >> described). >> > >> >> > >> The question I had about then handle parameters are around the >> record, >> > >> should it be `ProducerRecord<byte[], byte[]>`, or be generics of >> > >> `ProducerRecord<? extends K, ? extends V>` or `ProducerRecord<? >> extends >> > >> Object, ? extends Object>`? >> > >> >> > >> Also, should the handle function include the `RecordMetadata` as >> well in >> > >> case it is not null? >> > >> >> > >> We may probably try to write down at least the following handling >> logic >> > >> and >> > >> see if the given API is sufficient for it: 1) throw exception >> > immediately >> > >> to fail fast and stop the world, 2) log the error and drop record and >> > >> proceed silently, 3) send such errors to a specific "error" Kafka >> topic, >> > >> or >> > >> record it as an app-level metrics ( >> > >> https://kafka.apache.org/documentation/#kafka_streams_monitoring) >> for >> > >> monitoring. >> > >> >> > >> Guozhang >> > >> >> > >> >> > >> >> > >> On Fri, Oct 20, 2017 at 5:47 PM, Matt Farmer <m...@frmr.me> wrote: >> > >> >> > >> > I did some more digging tonight. >> > >> > >> > >> > @Ted: It looks like the deserialization handler uses >> > >> > "default.deserialization.exception.handler" for the config name. No >> > >> > ".class" on the end. I'm inclined to think this should use >> > >> > "default.production.exception.handler". >> > >> > >> > >> > On Fri, Oct 20, 2017 at 8:22 PM Matt Farmer <m...@frmr.me> wrote: >> > >> > >> > >> > > Okay, I've dug into this a little bit. >> > >> > > >> > >> > > I think getting access to the serialized record is possible, and >> > >> changing >> > >> > > the naming and return type is certainly doable. However, because >> > we're >> > >> > > hooking into the onCompletion callback we have no guarantee that >> the >> > >> > > ProcessorContext state hasn't changed by the time this particular >> > >> handler >> > >> > > runs. So I think the signature would change to something like: >> > >> > > >> > >> > > ProductionExceptionHandlerResponse handle(final >> ProducerRecord<..> >> > >> > record, >> > >> > > final Exception exception) >> > >> > > >> > >> > > Would this be acceptable? >> > >> > > >> > >> > > On Thu, Oct 19, 2017 at 7:33 PM Matt Farmer <m...@frmr.me> >> wrote: >> > >> > > >> > >> > >> Ah good idea. Hmmm. I can line up the naming and return type but >> > I’m >> > >> not >> > >> > >> sure if I can get my hands on the context and the record itself >> > >> without >> > >> > >> other changes. >> > >> > >> >> > >> > >> Let me dig in and follow up here tomorrow. >> > >> > >> On Thu, Oct 19, 2017 at 7:14 PM Matthias J. Sax < >> > >> matth...@confluent.io> >> > >> > >> wrote: >> > >> > >> >> > >> > >>> Thanks for the KIP. >> > >> > >>> >> > >> > >>> Are you familiar with KIP-161? >> > >> > >>> >> > >> > >>> >> > >> > >>> >> > >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+ >> > >> > deserialization+exception+handlers >> > >> > >>> >> > >> > >>> I thinks, we should align the design (parameter naming, return >> > >> types, >> > >> > >>> class names etc) of KIP-210 to KIP-161 to get a unified user >> > >> > experience. >> > >> > >>> >> > >> > >>> >> > >> > >>> >> > >> > >>> -Matthias >> > >> > >>> >> > >> > >>> >> > >> > >>> On 10/18/17 4:20 PM, Matt Farmer wrote: >> > >> > >>> > I’ll create the JIRA ticket. >> > >> > >>> > >> > >> > >>> > I think that config name will work. I’ll update the KIP >> > >> accordingly. >> > >> > >>> > On Wed, Oct 18, 2017 at 6:09 PM Ted Yu <yuzhih...@gmail.com> >> > >> wrote: >> > >> > >>> > >> > >> > >>> >> Can you create JIRA that corresponds to the KIP ? >> > >> > >>> >> >> > >> > >>> >> For the new config, how about naming it >> > >> > >>> >> production.exception.processor.class >> > >> > >>> >> ? This way it is clear that class name should be specified. >> > >> > >>> >> >> > >> > >>> >> Cheers >> > >> > >>> >> >> > >> > >>> >> On Wed, Oct 18, 2017 at 2:40 PM, Matt Farmer <m...@frmr.me> >> > >> wrote: >> > >> > >>> >> >> > >> > >>> >>> Hello everyone, >> > >> > >>> >>> >> > >> > >>> >>> This is the discussion thread for the KIP that I just filed >> > >> here: >> > >> > >>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> > >> > >>> >>> 210+-+Provide+for+custom+error+handling++when+Kafka+ >> > >> > >>> >>> Streams+fails+to+produce >> > >> > >>> >>> >> > >> > >>> >>> Looking forward to getting some feedback from folks about >> this >> > >> idea >> > >> > >>> and >> > >> > >>> >>> working toward a solution we can contribute back. :) >> > >> > >>> >>> >> > >> > >>> >>> Cheers, >> > >> > >>> >>> Matt Farmer >> > >> > >>> >>> >> > >> > >>> >> >> > >> > >>> > >> > >> > >>> >> > >> > >>> >> > >> > >> > >> >> > >> >> > >> >> > >> -- >> > >> -- Guozhang >> > >> >> > > >> > >> >> >> >> -- >> -- Guozhang >> >