Hi all,

Thanks a lot for the constructive feedbacks!



Addressing some of the main concerns:


- The `RecordTooLargeException` can be thrown by broker, producer and
consumer. Of course, the `ProducerExceptionHandler` interface is introduced
to affect only the exceptions thrown from the producer. This KIP very
specifically means to provide a possibility to manage the
`RecordTooLargeException` thrown from the Producer.send() method. Please
see “Proposed Changes” section for more clarity. I investigated the issue
there thoroughly. I hope it can explain the concern about how we handle the
errors as well.



- The problem with Callback: Methods of Callback are called when the record
sent to the server is acknowledged, while this is not the desired time for
all exceptions. We intend to handle exceptions beforehand.


- What if the custom handler returns RETRY for `RecordTooLargeException`? I
assume changing the producer configuration at runtime is possible. If so,
RETRY for a too large record is valid because maybe in the next try, the
too large record is not poisoning any more. I am not 100% sure about the
technical details, though. Otherwise, we can consider the RETRY as FAIL for
this exception. Another solution would be to consider a constant number of
times for RETRY which can be useful for other exceptions as well.


- What if the handle() method itself throws an exception? I think
rationally and pragmatically, the behaviour must be exactly like when no
custom handler is defined since the user actually did not have a working
handler.


- Why not use config parameters instead of an interface? As explained in
the “Rejected Alternatives” section, we assume that the handler will be
used for a greater number of exceptions in the future. Defining a
configuration parameter for each exception may make the configuration a bit
messy. Moreover, the handler offers more flexibility.



Small change:

-ProductionExceptionHandlerResponse -> Response for brevity and simplicity.
Could’ve been HandlerResponse too I think!


I thank you all again for your useful questions/suggestions.

I would be happy to hear more of your concerns, as stated in some feedback.

Cheers,
Alieh

On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan
<jols...@confluent.io.invalid> wrote:

> Thanks Alieh for the updates.
>
> I'm a little concerned about the design pattern here. It seems like we want
> specific usages, but we are packaging it as a generic handler.
> I think we tried to narrow down on the specific errors we want to handle,
> but it feels a little clunky as we have a generic thing for two specific
> errors.
>
> I'm wondering if we are using the right patterns to solve these problems. I
> agree though that we will need something more than the error classes I'm
> proposing if we want to have different handling be configurable.
> My concern is that the open-endedness of a handler means that we are
> creating more problems than we are solving. It is still unclear to me how
> we expect to handle the errors. Perhaps we could include an example? It
> seems like there is a specific use case in mind and maybe we can make a
> design that is tighter and supports that case.
>
> Justine
>
> On Tue, Apr 23, 2024 at 3:06 PM Kirk True <k...@kirktrue.pro> wrote:
>
> > Hi Alieh,
> >
> > Thanks for the KIP!
> >
> > A few questions:
> >
> > K1. What is the expected behavior for the producer if it generates a
> > RecordTooLargeException, but the handler returns RETRY?
> > K2. How do we determine which Record was responsible for the
> > UnknownTopicOrPartitionException since we get that response when
> sending  a
> > batch of records?
> > K3. What is the expected behavior if the handle() method itself throws an
> > error?
> > K4. What is the downside of adding an onError() method to the Producer’s
> > Callback interface vs. a new mechanism?
> > K5. Can we change “ProducerExceptionHandlerResponse" to just “Response”
> > given that it’s an inner enum?
> > K6. Any recommendation for callback authors to handle different behavior
> > for different topics?
> >
> > I’ll echo what others have said, it would help me understand why we want
> > another handler class if there were more examples in the Motivation
> > section. As it stands now, I agree with Chris that the stated issues
> could
> > be solved by adding two new configuration options:
> >
> >     oversized.record.behavior=fail
> >     retry.on.unknown.topic.or.partition=true
> >
> > What I’m not yet able to wrap my head around is: what exactly would the
> > logic in the handler be? I’m not very imaginative, so I’m assuming they’d
> > mostly be if-this-then-that. However, if they’re more complicated, I’d
> have
> > other concerns.
> >
> > Thanks,
> > Kirk
> >
> > > On Apr 22, 2024, at 7:38 AM, Alieh Saeedi <asae...@confluent.io.INVALID
> >
> > wrote:
> > >
> > > Thank you all for the feedback!
> > >
> > > Addressing the main concern: The KIP is about giving the user the
> ability
> > > to handle producer exceptions, but to be more conservative and avoid
> > future
> > > issues, we decided to be limited to a short list of exceptions. I
> > included
> > > *RecordTooLargeExceptin* and *UnknownTopicOrPartitionException. *Open
> to
> > > suggestion for adding some more ;-)
> > >
> > > KIP Updates:
> > > - clarified the way that the user should configure the Producer to use
> > the
> > > custom handler. I think adding a producer config property is the
> cleanest
> > > one.
> > > - changed the *ClientExceptionHandler* to *ProducerExceptionHandler* to
> > be
> > > closer to what we are changing.
> > > - added the ProducerRecord as the input parameter of the handle()
> method
> > as
> > > well.
> > > - increased the response types to 3 to have fail and two types of
> > continue.
> > > - The default behaviour is having no custom handler, having the
> > > corresponding config parameter set to null. Therefore, the KIP provides
> > no
> > > default implementation of the interface.
> > > - We follow the interface solution as described in the
> > > Rejected Alternetives section.
> > >
> > >
> > > Cheers,
> > > Alieh
> > >
> > >
> > > On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax <mj...@apache.org>
> > wrote:
> > >
> > >> Thanks for the KIP Alieh! It addresses an important case for error
> > >> handling.
> > >>
> > >> I agree that using this handler would be an expert API, as mentioned
> by
> > >> a few people. But I don't think it would be a reason to not add it.
> It's
> > >> always a tricky tradeoff what to expose to users and to avoid foot
> guns,
> > >> but we added similar handlers to Kafka Streams, and have good
> experience
> > >> with it. Hence, I understand, but don't share the concern raised.
> > >>
> > >> I also agree that there is some responsibility by the user to
> understand
> > >> how such a handler should be implemented to not drop data by accident.
> > >> But it seem unavoidable and acceptable.
> > >>
> > >> While I understand that a "simpler / reduced" API (eg via configs)
> might
> > >> also work, I personally prefer a full handler. Configs have the same
> > >> issue that they could be miss-used potentially leading to incorrectly
> > >> dropped data, but at the same time are less flexible (and thus maybe
> > >> ever harder to use correctly...?). Base on my experience, there is
> also
> > >> often weird corner case for which it make sense to also drop records
> for
> > >> other exceptions, and a full handler has the advantage of full
> > >> flexibility and "absolute power!".
> > >>
> > >> To be fair: I don't know the exact code paths of the producer in
> > >> details, so please keep me honest. But my understanding is, that the
> KIP
> > >> aims to allow users to react to internal exception, and decide to keep
> > >> retrying internally, swallow the error and drop the record, or raise
> the
> > >> error?
> > >>
> > >> Maybe the KIP would need to be a little bit more precises what error
> we
> > >> want to cover -- I don't think this list must be exhaustive, as we can
> > >> always do follow up KIP to also apply the handler to other errors to
> > >> expand the scope of the handler. The KIP does mention examples, but it
> > >> might be good to explicitly state for what cases the handler gets
> > applied?
> > >>
> > >> I am also not sure if CONTINUE and FAIL are enough options? Don't we
> > >> need three options? Or would `CONTINUE` have different meaning
> depending
> > >> on the type of error? Ie, for a retryable error `CONTINUE` would mean
> > >> keep retrying internally, but for a non-retryable error `CONTINUE`
> means
> > >> swallow the error and drop the record? This semantic overload seems
> > >> tricky to reason about by users, so it might better to split
> `CONTINUE`
> > >> into two cases -> `RETRY` and `SWALLOW` (or some better names).
> > >>
> > >> Additionally, should we just ship a `DefaultClientExceptionHandler`
> > >> which would return `FAIL`, for backward compatibility. Or don't have
> any
> > >> default handler to begin with and allow it to be `null`? I don't see
> the
> > >> need for a specific `TransactionExceptionHandler`. To me, the goal
> > >> should be to not modify the default behavior at all, but to just allow
> > >> users to change the default behavior if there is a need.
> > >>
> > >> What is missing on the KIP though it, how the handler is passed into
> the
> > >> producer thought? Would we need a new config which allows to set a
> > >> custom handler? And/or would we allow to pass in an instance via the
> > >> constructor or add a new method to set a handler?
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 4/18/24 10:02 AM, Andrew Schofield wrote:
> > >>> Hi Alieh,
> > >>> Thanks for the KIP.
> > >>>
> > >>> Exception handling in the Kafka producer and consumer is really not
> > >> ideal.
> > >>> It’s even harder working out what’s going on with the consumer.
> > >>>
> > >>> I’m a bit nervous about this KIP and I agree with Chris that it could
> > do
> > >> with additional
> > >>> motivation. This would be an expert-level interface given how
> > complicated
> > >>> the exception handling for Kafka has become.
> > >>>
> > >>> 7. The application is not really aware of the batching being done on
> > its
> > >> behalf.
> > >>> The ProduceResponse can actually return an array of records which
> > failed
> > >>> per batch. If you get RecordTooLargeException, and want to retry, you
> > >> probably
> > >>> need to remove the offending records from the batch and retry it.
> This
> > >> is getting fiddly.
> > >>>
> > >>> 8. There is already o.a.k.clients.producer.Callback. I wonder whether
> > an
> > >>> alternative might be to add a method to the existing Callback
> > interface,
> > >> such as:
> > >>>
> > >>>   ClientExceptionResponse onException(Exception exception)
> > >>>
> > >>> It would be called when a ProduceResponse contains an error, but the
> > >>> producer is going to retry. It tells the producer whether to go ahead
> > >> with the retry
> > >>> or not. The default implementation would be to CONTINUE, because
> that’s
> > >>> just continuing to retry as planned. Note that this is a per-record
> > >> callback, so
> > >>> the application would be able to understand which records failed.
> > >>>
> > >>> By using an existing interface, we already know how to configure it
> and
> > >> we know
> > >>> about the threading model for calling it.
> > >>>
> > >>>
> > >>> Thanks,
> > >>> Andrew
> > >>>
> > >>>
> > >>>
> > >>>> On 17 Apr 2024, at 18:17, Chris Egerton <chr...@aiven.io.INVALID>
> > >> wrote:
> > >>>>
> > >>>> Hi Alieh,
> > >>>>
> > >>>> Thanks for the KIP! The issue with writing to non-existent topics is
> > >>>> particularly frustrating for users of Kafka Connect and has been the
> > >> source
> > >>>> of a handful of Jira tickets over the years. My thoughts:
> > >>>>
> > >>>> 1. An additional detail we can add to the motivation (or possibly
> > >> rejected
> > >>>> alternatives) section is that this kind of custom retry logic can't
> be
> > >>>> implemented by hand by, e.g., setting retries to 0 in the producer
> > >> config
> > >>>> and handling exceptions at the application level. Or rather, it can,
> > >> but 1)
> > >>>> it's a bit painful to have to reimplement at every call-site for
> > >>>> Producer::send (and any code that awaits the returned Future) and 2)
> > >> it's
> > >>>> impossible to do this without losing idempotency on retries.
> > >>>>
> > >>>> 2. That said, I wonder if a pluggable interface is really the right
> > call
> > >>>> here. Documenting the interactions of a producer with
> > >>>> a ClientExceptionHandler instance will be tricky, and implementing
> > them
> > >>>> will also be a fair amount of work. I believe that there needs to be
> > >> some
> > >>>> more granularity for how writes to non-existent topics (or really,
> > >>>> UNKNOWN_TOPIC_OR_PARTITION and related errors from the broker) are
> > >> handled,
> > >>>> but I'm torn between keeping it simple with maybe one or two new
> > >> producer
> > >>>> config properties, or a full-blown pluggable interface. If there are
> > >> more
> > >>>> cases that would benefit from a pluggable interface, it would be
> nice
> > to
> > >>>> identify these and add them to the KIP to strengthen the motivation.
> > >> Right
> > >>>> now, I'm not sure the two that are mentioned in the motivation are
> > >>>> sufficient.
> > >>>>
> > >>>> 3. Alternatively, a possible compromise is for this KIP to introduce
> > new
> > >>>> properties that dictate how to handle unknown-topic-partition and
> > >>>> record-too-large errors, with the thinking that if we introduce a
> > >> pluggable
> > >>>> interface later on, these properties will be recognized by the
> default
> > >>>> implementation of that interface but could be completely ignored or
> > >>>> replaced by alternative implementations.
> > >>>>
> > >>>> 4. (Nit) You can remove the "This page is meant as a template for
> > >> writing a
> > >>>> KIP..." part from the KIP. It's not a template anymore :)
> > >>>>
> > >>>> 5. If we do go the pluggable interface route, wouldn't we want to
> add
> > >> the
> > >>>> possibility for retry logic? The simplest version of this could be
> to
> > >> add a
> > >>>> RETRY value to the ClientExceptionHandlerResponse enum.
> > >>>>
> > >>>> 6. I think "SKIP" or "DROP" might be clearer instead of "CONTINUE"
> for
> > >>>> the ClientExceptionHandlerResponse enum, since they cause records to
> > be
> > >>>> dropped.
> > >>>>
> > >>>> Cheers,
> > >>>>
> > >>>> Chris
> > >>>>
> > >>>> On Wed, Apr 17, 2024 at 12:25 PM Justine Olshan
> > >>>> <jols...@confluent.io.invalid> wrote:
> > >>>>
> > >>>>> Hey Alieh,
> > >>>>>
> > >>>>> I echo what Omnia says, I'm not sure I understand the implications
> of
> > >> the
> > >>>>> change and I think more detail is needed.
> > >>>>>
> > >>>>> This comment also confused me a bit:
> > >>>>> * {@code ClientExceptionHandler} that continues the transaction
> even
> > >> if a
> > >>>>> record is too large.
> > >>>>> * Otherwise, it makes the transaction to fail.
> > >>>>>
> > >>>>> Relatedly, I've been working with some folks on a KIP for
> > transactions
> > >>>>> errors and how they are handled. Specifically for the
> > >>>>> RecordTooLargeException (and a few other errors), we want to give a
> > new
> > >>>>> error category for this error that allows the application to choose
> > >> how it
> > >>>>> is handled. Maybe this KIP is something that you are looking for?
> > Stay
> > >>>>> tuned :)
> > >>>>>
> > >>>>> Justine
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> On Wed, Apr 17, 2024 at 8:03 AM Omnia Ibrahim <
> > o.g.h.ibra...@gmail.com
> > >>>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Hi Alieh,
> > >>>>>> Thanks for the KIP! I have couple of comments
> > >>>>>> - You mentioned in the KIP motivation,
> > >>>>>>> Another example for which a production exception handler could be
> > >>>>> useful
> > >>>>>> is if a user tries to write into a non-existing topic, which
> > returns a
> > >>>>>> retryable error code; with infinite retries, the producer would
> hang
> > >>>>>> retrying forever. A handler could help to break the infinite retry
> > >> loop.
> > >>>>>>
> > >>>>>> How the handler can differentiate between something that is
> > temporary
> > >> and
> > >>>>>> it should keep retrying and something permanent like forgot to
> > create
> > >> the
> > >>>>>> topic? temporary here could be
> > >>>>>> the producer get deployed before the topic creation finish
> > (specially
> > >> if
> > >>>>>> the topic creation is handled via IaC)
> > >>>>>> temporary offline partitions
> > >>>>>> leadership changing
> > >>>>>>        Isn’t this putting the producer at risk of dropping records
> > >>>>>> unintentionally?
> > >>>>>> - Can you elaborate more on what is written in the compatibility /
> > >>>>>> migration plan section please by explaining in bit more details
> what
> > >> is
> > >>>>> the
> > >>>>>> changing behaviour and how this will impact client who are
> > upgrading?
> > >>>>>> - In the proposal changes can you elaborate in the KIP where in
> the
> > >>>>>> producer lifecycle will ClientExceptionHandler and
> > >>>>>> TransactionExceptionHandler get triggered, and how will the
> producer
> > >>>>>> configure them to point to costumed implementation.
> > >>>>>>
> > >>>>>> Thanks
> > >>>>>> Omnia
> > >>>>>>
> > >>>>>>> On 17 Apr 2024, at 13:13, Alieh Saeedi
> > <asae...@confluent.io.INVALID
> > >>>
> > >>>>>> wrote:
> > >>>>>>>
> > >>>>>>> Hi all,
> > >>>>>>> Here is the KIP-1038: Add Custom Error Handler to Producer.
> > >>>>>>> <
> > >>>>>>
> > >>>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1038%3A+Add+Custom+Error+Handler+to+Producer
> > >>>>>>>
> > >>>>>>> I look forward to your feedback!
> > >>>>>>>
> > >>>>>>> Cheers,
> > >>>>>>> Alieh
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>
> > >>
> >
> >
>

Reply via email to