One more comment.

You mention a default implementation for the handler that fails. I
think, this should be part of the public API and thus should have a
proper defined named that is mentioned in the KIP.

We could also add a second implementation for the log-and-move-on
strategy, as both are the two most common cases. This class should also
be part of public API (so users can just set in the config) with a
proper name.


Otherwise, I like the KIP a lot! Thanks.


-Matthias


On 11/1/17 12:23 AM, Matt Farmer wrote:
> Thanks for the heads up. Yes, I think my changes are compatible with that
> PR, but there will be a merge conflict that happens whenever one of the PRs
> is merged. Happy to reconcile the changes in my PR if 4148 goes in first. :)
> 
> On Tue, Oct 31, 2017 at 6:44 PM Guozhang Wang <wangg...@gmail.com> wrote:
> 
>> That sounds reasonable, thanks Matt.
>>
>> As for the implementation, please note that there is another ongoing PR
>> that may touch the same classes that you are working on:
>> https://github.com/apache/kafka/pull/4148
>>
>> So it may help if you can also take a look at that PR and see if it is
>> compatible with your changes.
>>
>>
>>
>> Guozhang
>>
>>
>> On Tue, Oct 31, 2017 at 10:59 AM, Matt Farmer <m...@frmr.me> wrote:
>>
>>> I've opened this pull request to implement the KIP as currently written:
>>> https://github.com/apache/kafka/pull/4165. It still needs some tests
>>> added,
>>> but largely represents the shape I was going for.
>>>
>>> If there are more points that folks would like to discuss, please let me
>>> know. If I don't hear anything by tomorrow afternoon I'll probably start
>> a
>>> [VOTE] thread.
>>>
>>> Thanks,
>>> Matt
>>>
>>> On Fri, Oct 27, 2017 at 7:33 PM Matt Farmer <m...@frmr.me> wrote:
>>>
>>>> I can’t think of a reason that would be problematic.
>>>>
>>>> Most of the time I would write a handler like this, I either want to
>>>> ignore the error or fail and bring everything down so that I can spin
>> it
>>>> back up later and resume from earlier offsets. When we start up after
>>>> crashing we’ll eventually try to process the message we failed to
>> produce
>>>> again.
>>>>
>>>> I’m concerned that “putting in a queue for later” opens you up to
>> putting
>>>> messages into the destination topic in an unexpected order. However if
>>>> others feel differently, I’m happy to talk about it.
>>>>
>>>> On Fri, Oct 27, 2017 at 7:10 PM Guozhang Wang <wangg...@gmail.com>
>>> wrote:
>>>>
>>>>>> Please correct me if I'm wrong, but my understanding is that the
>>> record
>>>>>> metadata is always null if an exception occurred while trying to
>>>>> produce.
>>>>>
>>>>> That is right. Thanks.
>>>>>
>>>>> I looked at the example code, and one thing I realized that since we
>> are
>>>>> not passing the context in the handle function, we may not be
>> implement
>>>>> the
>>>>> logic to send the fail records into another queue for future
>> processing.
>>>>> Would people think that would be a big issue?
>>>>>
>>>>>
>>>>> Guozhang
>>>>>
>>>>>
>>>>> On Thu, Oct 26, 2017 at 12:14 PM, Matt Farmer <m...@frmr.me> wrote:
>>>>>
>>>>>> Hello all,
>>>>>>
>>>>>> I've updated the KIP based on this conversation, and made it so that
>>> its
>>>>>> interface, config setting, and parameters line up more closely with
>>> the
>>>>>> interface in KIP-161 (deserialization handler).
>>>>>>
>>>>>> I believe there are a few specific questions I need to reply to.....
>>>>>>
>>>>>>> The question I had about then handle parameters are around the
>>> record,
>>>>>>> should it be `ProducerRecord<byte[], byte[]>`, or be generics of
>>>>>>> `ProducerRecord<? extends K, ? extends V>` or `ProducerRecord<?
>>>>> extends
>>>>>>> Object, ? extends Object>`?
>>>>>>
>>>>>> At this point in the code we're guaranteed that this is a
>>>>>> ProducerRecord<byte[], byte[]>, so the generics would just make it
>>>>> harder
>>>>>> to work with the key and value.
>>>>>>
>>>>>>> Also, should the handle function include the `RecordMetadata` as
>>> well
>>>>> in
>>>>>>> case it is not null?
>>>>>>
>>>>>> Please correct me if I'm wrong, but my understanding is that the
>>> record
>>>>>> metadata is always null if an exception occurred while trying to
>>>>> produce.
>>>>>>
>>>>>>> We may probably try to write down at least the following handling
>>>>> logic
>>>>>> and
>>>>>>> see if the given API is sufficient for it
>>>>>>
>>>>>> I've added some examples to the KIP. Let me know what you think.
>>>>>>
>>>>>> Cheers,
>>>>>> Matt
>>>>>>
>>>>>> On Mon, Oct 23, 2017 at 9:00 PM Matt Farmer <m...@frmr.me> wrote:
>>>>>>
>>>>>>> Thanks for this feedback. I’m at a conference right now and am
>>>>> planning
>>>>>> on
>>>>>>> updating the KIP again with details from this conversation later
>>> this
>>>>>> week.
>>>>>>>
>>>>>>> I’ll shoot you a more detailed response then! :)
>>>>>>> On Mon, Oct 23, 2017 at 8:16 PM Guozhang Wang <wangg...@gmail.com
>>>
>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks for the KIP Matt.
>>>>>>>>
>>>>>>>> Regarding the handle interface of ProductionExceptionHandlerResp
>>> onse,
>>>>>>>> could
>>>>>>>> you write it on the wiki also, along with the actual added config
>>>>> names
>>>>>>>> (e.g. what
>>>>>>>>
>>>>>>>>
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+
>>>>>> deserialization+exception+handlers
>>>>>>>> described).
>>>>>>>>
>>>>>>>> The question I had about then handle parameters are around the
>>>>> record,
>>>>>>>> should it be `ProducerRecord<byte[], byte[]>`, or be generics of
>>>>>>>> `ProducerRecord<? extends K, ? extends V>` or `ProducerRecord<?
>>>>> extends
>>>>>>>> Object, ? extends Object>`?
>>>>>>>>
>>>>>>>> Also, should the handle function include the `RecordMetadata` as
>>>>> well in
>>>>>>>> case it is not null?
>>>>>>>>
>>>>>>>> We may probably try to write down at least the following handling
>>>>> logic
>>>>>>>> and
>>>>>>>> see if the given API is sufficient for it: 1) throw exception
>>>>>> immediately
>>>>>>>> to fail fast and stop the world, 2) log the error and drop record
>>> and
>>>>>>>> proceed silently, 3) send such errors to a specific "error" Kafka
>>>>> topic,
>>>>>>>> or
>>>>>>>> record it as an app-level metrics (
>>>>>>>> https://kafka.apache.org/documentation/#kafka_streams_monitoring
>> )
>>>>> for
>>>>>>>> monitoring.
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Oct 20, 2017 at 5:47 PM, Matt Farmer <m...@frmr.me>
>> wrote:
>>>>>>>>
>>>>>>>>> I did some more digging tonight.
>>>>>>>>>
>>>>>>>>> @Ted: It looks like the deserialization handler uses
>>>>>>>>> "default.deserialization.exception.handler" for the config
>>> name. No
>>>>>>>>> ".class" on the end. I'm inclined to think this should use
>>>>>>>>> "default.production.exception.handler".
>>>>>>>>>
>>>>>>>>> On Fri, Oct 20, 2017 at 8:22 PM Matt Farmer <m...@frmr.me>
>>> wrote:
>>>>>>>>>
>>>>>>>>>> Okay, I've dug into this a little bit.
>>>>>>>>>>
>>>>>>>>>> I think getting access to the serialized record is possible,
>>> and
>>>>>>>> changing
>>>>>>>>>> the naming and return type is certainly doable. However,
>>> because
>>>>>> we're
>>>>>>>>>> hooking into the onCompletion callback we have no guarantee
>>> that
>>>>> the
>>>>>>>>>> ProcessorContext state hasn't changed by the time this
>>> particular
>>>>>>>> handler
>>>>>>>>>> runs. So I think the signature would change to something
>> like:
>>>>>>>>>>
>>>>>>>>>> ProductionExceptionHandlerResponse handle(final
>>>>> ProducerRecord<..>
>>>>>>>>> record,
>>>>>>>>>> final Exception exception)
>>>>>>>>>>
>>>>>>>>>> Would this be acceptable?
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 19, 2017 at 7:33 PM Matt Farmer <m...@frmr.me>
>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Ah good idea. Hmmm. I can line up the naming and return type
>>> but
>>>>>> I’m
>>>>>>>> not
>>>>>>>>>>> sure if I can get my hands on the context and the record
>>> itself
>>>>>>>> without
>>>>>>>>>>> other changes.
>>>>>>>>>>>
>>>>>>>>>>> Let me dig in and follow up here tomorrow.
>>>>>>>>>>> On Thu, Oct 19, 2017 at 7:14 PM Matthias J. Sax <
>>>>>>>> matth...@confluent.io>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the KIP.
>>>>>>>>>>>>
>>>>>>>>>>>> Are you familiar with KIP-161?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+
>>>>>>>>> deserialization+exception+handlers
>>>>>>>>>>>>
>>>>>>>>>>>> I thinks, we should align the design (parameter naming,
>>> return
>>>>>>>> types,
>>>>>>>>>>>> class names etc) of KIP-210 to KIP-161 to get a unified
>> user
>>>>>>>>> experience.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 10/18/17 4:20 PM, Matt Farmer wrote:
>>>>>>>>>>>>> I’ll create the JIRA ticket.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think that config name will work. I’ll update the KIP
>>>>>>>> accordingly.
>>>>>>>>>>>>> On Wed, Oct 18, 2017 at 6:09 PM Ted Yu <
>>> yuzhih...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Can you create JIRA that corresponds to the KIP ?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For the new config, how about naming it
>>>>>>>>>>>>>> production.exception.processor.class
>>>>>>>>>>>>>> ? This way it is clear that class name should be
>>> specified.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Oct 18, 2017 at 2:40 PM, Matt Farmer <
>>> m...@frmr.me>
>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hello everyone,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This is the discussion thread for the KIP that I just
>>> filed
>>>>>>>> here:
>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>> 210+-+Provide+for+custom+error+handling++when+Kafka+
>>>>>>>>>>>>>>> Streams+fails+to+produce
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Looking forward to getting some feedback from folks
>> about
>>>>> this
>>>>>>>> idea
>>>>>>>>>>>> and
>>>>>>>>>>>>>>> working toward a solution we can contribute back. :)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>> Matt Farmer
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> -- Guozhang
>>>>>
>>>>
>>>
>>
>>
>>
>> --
>> -- Guozhang
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to