One more comment. You mention a default implementation for the handler that fails. I think, this should be part of the public API and thus should have a proper defined named that is mentioned in the KIP.
We could also add a second implementation for the log-and-move-on strategy, as both are the two most common cases. This class should also be part of public API (so users can just set in the config) with a proper name. Otherwise, I like the KIP a lot! Thanks. -Matthias On 11/1/17 12:23 AM, Matt Farmer wrote: > Thanks for the heads up. Yes, I think my changes are compatible with that > PR, but there will be a merge conflict that happens whenever one of the PRs > is merged. Happy to reconcile the changes in my PR if 4148 goes in first. :) > > On Tue, Oct 31, 2017 at 6:44 PM Guozhang Wang <wangg...@gmail.com> wrote: > >> That sounds reasonable, thanks Matt. >> >> As for the implementation, please note that there is another ongoing PR >> that may touch the same classes that you are working on: >> https://github.com/apache/kafka/pull/4148 >> >> So it may help if you can also take a look at that PR and see if it is >> compatible with your changes. >> >> >> >> Guozhang >> >> >> On Tue, Oct 31, 2017 at 10:59 AM, Matt Farmer <m...@frmr.me> wrote: >> >>> I've opened this pull request to implement the KIP as currently written: >>> https://github.com/apache/kafka/pull/4165. It still needs some tests >>> added, >>> but largely represents the shape I was going for. >>> >>> If there are more points that folks would like to discuss, please let me >>> know. If I don't hear anything by tomorrow afternoon I'll probably start >> a >>> [VOTE] thread. >>> >>> Thanks, >>> Matt >>> >>> On Fri, Oct 27, 2017 at 7:33 PM Matt Farmer <m...@frmr.me> wrote: >>> >>>> I can’t think of a reason that would be problematic. >>>> >>>> Most of the time I would write a handler like this, I either want to >>>> ignore the error or fail and bring everything down so that I can spin >> it >>>> back up later and resume from earlier offsets. When we start up after >>>> crashing we’ll eventually try to process the message we failed to >> produce >>>> again. >>>> >>>> I’m concerned that “putting in a queue for later” opens you up to >> putting >>>> messages into the destination topic in an unexpected order. However if >>>> others feel differently, I’m happy to talk about it. >>>> >>>> On Fri, Oct 27, 2017 at 7:10 PM Guozhang Wang <wangg...@gmail.com> >>> wrote: >>>> >>>>>> Please correct me if I'm wrong, but my understanding is that the >>> record >>>>>> metadata is always null if an exception occurred while trying to >>>>> produce. >>>>> >>>>> That is right. Thanks. >>>>> >>>>> I looked at the example code, and one thing I realized that since we >> are >>>>> not passing the context in the handle function, we may not be >> implement >>>>> the >>>>> logic to send the fail records into another queue for future >> processing. >>>>> Would people think that would be a big issue? >>>>> >>>>> >>>>> Guozhang >>>>> >>>>> >>>>> On Thu, Oct 26, 2017 at 12:14 PM, Matt Farmer <m...@frmr.me> wrote: >>>>> >>>>>> Hello all, >>>>>> >>>>>> I've updated the KIP based on this conversation, and made it so that >>> its >>>>>> interface, config setting, and parameters line up more closely with >>> the >>>>>> interface in KIP-161 (deserialization handler). >>>>>> >>>>>> I believe there are a few specific questions I need to reply to..... >>>>>> >>>>>>> The question I had about then handle parameters are around the >>> record, >>>>>>> should it be `ProducerRecord<byte[], byte[]>`, or be generics of >>>>>>> `ProducerRecord<? extends K, ? extends V>` or `ProducerRecord<? >>>>> extends >>>>>>> Object, ? extends Object>`? >>>>>> >>>>>> At this point in the code we're guaranteed that this is a >>>>>> ProducerRecord<byte[], byte[]>, so the generics would just make it >>>>> harder >>>>>> to work with the key and value. >>>>>> >>>>>>> Also, should the handle function include the `RecordMetadata` as >>> well >>>>> in >>>>>>> case it is not null? >>>>>> >>>>>> Please correct me if I'm wrong, but my understanding is that the >>> record >>>>>> metadata is always null if an exception occurred while trying to >>>>> produce. >>>>>> >>>>>>> We may probably try to write down at least the following handling >>>>> logic >>>>>> and >>>>>>> see if the given API is sufficient for it >>>>>> >>>>>> I've added some examples to the KIP. Let me know what you think. >>>>>> >>>>>> Cheers, >>>>>> Matt >>>>>> >>>>>> On Mon, Oct 23, 2017 at 9:00 PM Matt Farmer <m...@frmr.me> wrote: >>>>>> >>>>>>> Thanks for this feedback. I’m at a conference right now and am >>>>> planning >>>>>> on >>>>>>> updating the KIP again with details from this conversation later >>> this >>>>>> week. >>>>>>> >>>>>>> I’ll shoot you a more detailed response then! :) >>>>>>> On Mon, Oct 23, 2017 at 8:16 PM Guozhang Wang <wangg...@gmail.com >>> >>>>>> wrote: >>>>>>> >>>>>>>> Thanks for the KIP Matt. >>>>>>>> >>>>>>>> Regarding the handle interface of ProductionExceptionHandlerResp >>> onse, >>>>>>>> could >>>>>>>> you write it on the wiki also, along with the actual added config >>>>> names >>>>>>>> (e.g. what >>>>>>>> >>>>>>>> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+ >>>>>> deserialization+exception+handlers >>>>>>>> described). >>>>>>>> >>>>>>>> The question I had about then handle parameters are around the >>>>> record, >>>>>>>> should it be `ProducerRecord<byte[], byte[]>`, or be generics of >>>>>>>> `ProducerRecord<? extends K, ? extends V>` or `ProducerRecord<? >>>>> extends >>>>>>>> Object, ? extends Object>`? >>>>>>>> >>>>>>>> Also, should the handle function include the `RecordMetadata` as >>>>> well in >>>>>>>> case it is not null? >>>>>>>> >>>>>>>> We may probably try to write down at least the following handling >>>>> logic >>>>>>>> and >>>>>>>> see if the given API is sufficient for it: 1) throw exception >>>>>> immediately >>>>>>>> to fail fast and stop the world, 2) log the error and drop record >>> and >>>>>>>> proceed silently, 3) send such errors to a specific "error" Kafka >>>>> topic, >>>>>>>> or >>>>>>>> record it as an app-level metrics ( >>>>>>>> https://kafka.apache.org/documentation/#kafka_streams_monitoring >> ) >>>>> for >>>>>>>> monitoring. >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Oct 20, 2017 at 5:47 PM, Matt Farmer <m...@frmr.me> >> wrote: >>>>>>>> >>>>>>>>> I did some more digging tonight. >>>>>>>>> >>>>>>>>> @Ted: It looks like the deserialization handler uses >>>>>>>>> "default.deserialization.exception.handler" for the config >>> name. No >>>>>>>>> ".class" on the end. I'm inclined to think this should use >>>>>>>>> "default.production.exception.handler". >>>>>>>>> >>>>>>>>> On Fri, Oct 20, 2017 at 8:22 PM Matt Farmer <m...@frmr.me> >>> wrote: >>>>>>>>> >>>>>>>>>> Okay, I've dug into this a little bit. >>>>>>>>>> >>>>>>>>>> I think getting access to the serialized record is possible, >>> and >>>>>>>> changing >>>>>>>>>> the naming and return type is certainly doable. However, >>> because >>>>>> we're >>>>>>>>>> hooking into the onCompletion callback we have no guarantee >>> that >>>>> the >>>>>>>>>> ProcessorContext state hasn't changed by the time this >>> particular >>>>>>>> handler >>>>>>>>>> runs. So I think the signature would change to something >> like: >>>>>>>>>> >>>>>>>>>> ProductionExceptionHandlerResponse handle(final >>>>> ProducerRecord<..> >>>>>>>>> record, >>>>>>>>>> final Exception exception) >>>>>>>>>> >>>>>>>>>> Would this be acceptable? >>>>>>>>>> >>>>>>>>>> On Thu, Oct 19, 2017 at 7:33 PM Matt Farmer <m...@frmr.me> >>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Ah good idea. Hmmm. I can line up the naming and return type >>> but >>>>>> I’m >>>>>>>> not >>>>>>>>>>> sure if I can get my hands on the context and the record >>> itself >>>>>>>> without >>>>>>>>>>> other changes. >>>>>>>>>>> >>>>>>>>>>> Let me dig in and follow up here tomorrow. >>>>>>>>>>> On Thu, Oct 19, 2017 at 7:14 PM Matthias J. Sax < >>>>>>>> matth...@confluent.io> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thanks for the KIP. >>>>>>>>>>>> >>>>>>>>>>>> Are you familiar with KIP-161? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+ >>>>>>>>> deserialization+exception+handlers >>>>>>>>>>>> >>>>>>>>>>>> I thinks, we should align the design (parameter naming, >>> return >>>>>>>> types, >>>>>>>>>>>> class names etc) of KIP-210 to KIP-161 to get a unified >> user >>>>>>>>> experience. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -Matthias >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 10/18/17 4:20 PM, Matt Farmer wrote: >>>>>>>>>>>>> I’ll create the JIRA ticket. >>>>>>>>>>>>> >>>>>>>>>>>>> I think that config name will work. I’ll update the KIP >>>>>>>> accordingly. >>>>>>>>>>>>> On Wed, Oct 18, 2017 at 6:09 PM Ted Yu < >>> yuzhih...@gmail.com> >>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Can you create JIRA that corresponds to the KIP ? >>>>>>>>>>>>>> >>>>>>>>>>>>>> For the new config, how about naming it >>>>>>>>>>>>>> production.exception.processor.class >>>>>>>>>>>>>> ? This way it is clear that class name should be >>> specified. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Cheers >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Oct 18, 2017 at 2:40 PM, Matt Farmer < >>> m...@frmr.me> >>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hello everyone, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This is the discussion thread for the KIP that I just >>> filed >>>>>>>> here: >>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>> 210+-+Provide+for+custom+error+handling++when+Kafka+ >>>>>>>>>>>>>>> Streams+fails+to+produce >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Looking forward to getting some feedback from folks >> about >>>>> this >>>>>>>> idea >>>>>>>>>>>> and >>>>>>>>>>>>>>> working toward a solution we can contribute back. :) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>> Matt Farmer >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> -- Guozhang >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> -- Guozhang >>>>> >>>> >>> >> >> >> >> -- >> -- Guozhang >> >
signature.asc
Description: OpenPGP digital signature