I've opened this pull request to implement the KIP as currently written:
https://github.com/apache/kafka/pull/4165. It still needs some tests added,
but largely represents the shape I was going for.

If there are more points that folks would like to discuss, please let me
know. If I don't hear anything by tomorrow afternoon I'll probably start a
[VOTE] thread.

Thanks,
Matt

On Fri, Oct 27, 2017 at 7:33 PM Matt Farmer <m...@frmr.me> wrote:

> I can’t think of a reason that would be problematic.
>
> Most of the time I would write a handler like this, I either want to
> ignore the error or fail and bring everything down so that I can spin it
> back up later and resume from earlier offsets. When we start up after
> crashing we’ll eventually try to process the message we failed to produce
> again.
>
> I’m concerned that “putting in a queue for later” opens you up to putting
> messages into the destination topic in an unexpected order. However if
> others feel differently, I’m happy to talk about it.
>
> On Fri, Oct 27, 2017 at 7:10 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
>> > Please correct me if I'm wrong, but my understanding is that the record
>> > metadata is always null if an exception occurred while trying to
>> produce.
>>
>> That is right. Thanks.
>>
>> I looked at the example code, and one thing I realized that since we are
>> not passing the context in the handle function, we may not be implement
>> the
>> logic to send the fail records into another queue for future processing.
>> Would people think that would be a big issue?
>>
>>
>> Guozhang
>>
>>
>> On Thu, Oct 26, 2017 at 12:14 PM, Matt Farmer <m...@frmr.me> wrote:
>>
>> > Hello all,
>> >
>> > I've updated the KIP based on this conversation, and made it so that its
>> > interface, config setting, and parameters line up more closely with the
>> > interface in KIP-161 (deserialization handler).
>> >
>> > I believe there are a few specific questions I need to reply to.....
>> >
>> > > The question I had about then handle parameters are around the record,
>> > > should it be `ProducerRecord<byte[], byte[]>`, or be generics of
>> > > `ProducerRecord<? extends K, ? extends V>` or `ProducerRecord<?
>> extends
>> > > Object, ? extends Object>`?
>> >
>> > At this point in the code we're guaranteed that this is a
>> > ProducerRecord<byte[], byte[]>, so the generics would just make it
>> harder
>> > to work with the key and value.
>> >
>> > > Also, should the handle function include the `RecordMetadata` as well
>> in
>> > > case it is not null?
>> >
>> > Please correct me if I'm wrong, but my understanding is that the record
>> > metadata is always null if an exception occurred while trying to
>> produce.
>> >
>> > > We may probably try to write down at least the following handling
>> logic
>> > and
>> > > see if the given API is sufficient for it
>> >
>> > I've added some examples to the KIP. Let me know what you think.
>> >
>> > Cheers,
>> > Matt
>> >
>> > On Mon, Oct 23, 2017 at 9:00 PM Matt Farmer <m...@frmr.me> wrote:
>> >
>> > > Thanks for this feedback. I’m at a conference right now and am
>> planning
>> > on
>> > > updating the KIP again with details from this conversation later this
>> > week.
>> > >
>> > > I’ll shoot you a more detailed response then! :)
>> > > On Mon, Oct 23, 2017 at 8:16 PM Guozhang Wang <wangg...@gmail.com>
>> > wrote:
>> > >
>> > >> Thanks for the KIP Matt.
>> > >>
>> > >> Regarding the handle interface of ProductionExceptionHandlerResponse,
>> > >> could
>> > >> you write it on the wiki also, along with the actual added config
>> names
>> > >> (e.g. what
>> > >>
>> > >>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+
>> > deserialization+exception+handlers
>> > >> described).
>> > >>
>> > >> The question I had about then handle parameters are around the
>> record,
>> > >> should it be `ProducerRecord<byte[], byte[]>`, or be generics of
>> > >> `ProducerRecord<? extends K, ? extends V>` or `ProducerRecord<?
>> extends
>> > >> Object, ? extends Object>`?
>> > >>
>> > >> Also, should the handle function include the `RecordMetadata` as
>> well in
>> > >> case it is not null?
>> > >>
>> > >> We may probably try to write down at least the following handling
>> logic
>> > >> and
>> > >> see if the given API is sufficient for it: 1) throw exception
>> > immediately
>> > >> to fail fast and stop the world, 2) log the error and drop record and
>> > >> proceed silently, 3) send such errors to a specific "error" Kafka
>> topic,
>> > >> or
>> > >> record it as an app-level metrics (
>> > >> https://kafka.apache.org/documentation/#kafka_streams_monitoring)
>> for
>> > >> monitoring.
>> > >>
>> > >> Guozhang
>> > >>
>> > >>
>> > >>
>> > >> On Fri, Oct 20, 2017 at 5:47 PM, Matt Farmer <m...@frmr.me> wrote:
>> > >>
>> > >> > I did some more digging tonight.
>> > >> >
>> > >> > @Ted: It looks like the deserialization handler uses
>> > >> > "default.deserialization.exception.handler" for the config name. No
>> > >> > ".class" on the end. I'm inclined to think this should use
>> > >> > "default.production.exception.handler".
>> > >> >
>> > >> > On Fri, Oct 20, 2017 at 8:22 PM Matt Farmer <m...@frmr.me> wrote:
>> > >> >
>> > >> > > Okay, I've dug into this a little bit.
>> > >> > >
>> > >> > > I think getting access to the serialized record is possible, and
>> > >> changing
>> > >> > > the naming and return type is certainly doable. However, because
>> > we're
>> > >> > > hooking into the onCompletion callback we have no guarantee that
>> the
>> > >> > > ProcessorContext state hasn't changed by the time this particular
>> > >> handler
>> > >> > > runs. So I think the signature would change to something like:
>> > >> > >
>> > >> > > ProductionExceptionHandlerResponse handle(final
>> ProducerRecord<..>
>> > >> > record,
>> > >> > > final Exception exception)
>> > >> > >
>> > >> > > Would this be acceptable?
>> > >> > >
>> > >> > > On Thu, Oct 19, 2017 at 7:33 PM Matt Farmer <m...@frmr.me>
>> wrote:
>> > >> > >
>> > >> > >> Ah good idea. Hmmm. I can line up the naming and return type but
>> > I’m
>> > >> not
>> > >> > >> sure if I can get my hands on the context and the record itself
>> > >> without
>> > >> > >> other changes.
>> > >> > >>
>> > >> > >> Let me dig in and follow up here tomorrow.
>> > >> > >> On Thu, Oct 19, 2017 at 7:14 PM Matthias J. Sax <
>> > >> matth...@confluent.io>
>> > >> > >> wrote:
>> > >> > >>
>> > >> > >>> Thanks for the KIP.
>> > >> > >>>
>> > >> > >>> Are you familiar with KIP-161?
>> > >> > >>>
>> > >> > >>>
>> > >> > >>>
>> > >>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+
>> > >> > deserialization+exception+handlers
>> > >> > >>>
>> > >> > >>> I thinks, we should align the design (parameter naming, return
>> > >> types,
>> > >> > >>> class names etc) of KIP-210 to KIP-161 to get a unified user
>> > >> > experience.
>> > >> > >>>
>> > >> > >>>
>> > >> > >>>
>> > >> > >>> -Matthias
>> > >> > >>>
>> > >> > >>>
>> > >> > >>> On 10/18/17 4:20 PM, Matt Farmer wrote:
>> > >> > >>> > I’ll create the JIRA ticket.
>> > >> > >>> >
>> > >> > >>> > I think that config name will work. I’ll update the KIP
>> > >> accordingly.
>> > >> > >>> > On Wed, Oct 18, 2017 at 6:09 PM Ted Yu <yuzhih...@gmail.com>
>> > >> wrote:
>> > >> > >>> >
>> > >> > >>> >> Can you create JIRA that corresponds to the KIP ?
>> > >> > >>> >>
>> > >> > >>> >> For the new config, how about naming it
>> > >> > >>> >> production.exception.processor.class
>> > >> > >>> >> ? This way it is clear that class name should be specified.
>> > >> > >>> >>
>> > >> > >>> >> Cheers
>> > >> > >>> >>
>> > >> > >>> >> On Wed, Oct 18, 2017 at 2:40 PM, Matt Farmer <m...@frmr.me>
>> > >> wrote:
>> > >> > >>> >>
>> > >> > >>> >>> Hello everyone,
>> > >> > >>> >>>
>> > >> > >>> >>> This is the discussion thread for the KIP that I just filed
>> > >> here:
>> > >> > >>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > >> > >>> >>> 210+-+Provide+for+custom+error+handling++when+Kafka+
>> > >> > >>> >>> Streams+fails+to+produce
>> > >> > >>> >>>
>> > >> > >>> >>> Looking forward to getting some feedback from folks about
>> this
>> > >> idea
>> > >> > >>> and
>> > >> > >>> >>> working toward a solution we can contribute back. :)
>> > >> > >>> >>>
>> > >> > >>> >>> Cheers,
>> > >> > >>> >>> Matt Farmer
>> > >> > >>> >>>
>> > >> > >>> >>
>> > >> > >>> >
>> > >> > >>>
>> > >> > >>>
>> > >> >
>> > >>
>> > >>
>> > >>
>> > >> --
>> > >> -- Guozhang
>> > >>
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>

Reply via email to