Hi all, Thanks a lot for the constructive feedbacks!
Addressing some of the main concerns: - The `RecordTooLargeException` can be thrown by broker, producer and consumer. Of course, the `ProducerExceptionHandler` interface is introduced to affect only the exceptions thrown from the producer. This KIP very specifically means to provide a possibility to manage the `RecordTooLargeException` thrown from the Producer.send() method. Please see “Proposed Changes” section for more clarity. I investigated the issue there thoroughly. I hope it can explain the concern about how we handle the errors as well. - The problem with Callback: Methods of Callback are called when the record sent to the server is acknowledged, while this is not the desired time for all exceptions. We intend to handle exceptions beforehand. - What if the custom handler returns RETRY for `RecordTooLargeException`? I assume changing the producer configuration at runtime is possible. If so, RETRY for a too large record is valid because maybe in the next try, the too large record is not poisoning any more. I am not 100% sure about the technical details, though. Otherwise, we can consider the RETRY as FAIL for this exception. Another solution would be to consider a constant number of times for RETRY which can be useful for other exceptions as well. - What if the handle() method itself throws an exception? I think rationally and pragmatically, the behaviour must be exactly like when no custom handler is defined since the user actually did not have a working handler. - Why not use config parameters instead of an interface? As explained in the “Rejected Alternatives” section, we assume that the handler will be used for a greater number of exceptions in the future. Defining a configuration parameter for each exception may make the configuration a bit messy. Moreover, the handler offers more flexibility. Small change: -ProductionExceptionHandlerResponse -> Response for brevity and simplicity. Could’ve been HandlerResponse too I think! I thank you all again for your useful questions/suggestions. I would be happy to hear more of your concerns, as stated in some feedback. Cheers, Alieh On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan <jols...@confluent.io.invalid> wrote: > Thanks Alieh for the updates. > > I'm a little concerned about the design pattern here. It seems like we want > specific usages, but we are packaging it as a generic handler. > I think we tried to narrow down on the specific errors we want to handle, > but it feels a little clunky as we have a generic thing for two specific > errors. > > I'm wondering if we are using the right patterns to solve these problems. I > agree though that we will need something more than the error classes I'm > proposing if we want to have different handling be configurable. > My concern is that the open-endedness of a handler means that we are > creating more problems than we are solving. It is still unclear to me how > we expect to handle the errors. Perhaps we could include an example? It > seems like there is a specific use case in mind and maybe we can make a > design that is tighter and supports that case. > > Justine > > On Tue, Apr 23, 2024 at 3:06 PM Kirk True <k...@kirktrue.pro> wrote: > > > Hi Alieh, > > > > Thanks for the KIP! > > > > A few questions: > > > > K1. What is the expected behavior for the producer if it generates a > > RecordTooLargeException, but the handler returns RETRY? > > K2. How do we determine which Record was responsible for the > > UnknownTopicOrPartitionException since we get that response when > sending a > > batch of records? > > K3. What is the expected behavior if the handle() method itself throws an > > error? > > K4. What is the downside of adding an onError() method to the Producer’s > > Callback interface vs. a new mechanism? > > K5. Can we change “ProducerExceptionHandlerResponse" to just “Response” > > given that it’s an inner enum? > > K6. Any recommendation for callback authors to handle different behavior > > for different topics? > > > > I’ll echo what others have said, it would help me understand why we want > > another handler class if there were more examples in the Motivation > > section. As it stands now, I agree with Chris that the stated issues > could > > be solved by adding two new configuration options: > > > > oversized.record.behavior=fail > > retry.on.unknown.topic.or.partition=true > > > > What I’m not yet able to wrap my head around is: what exactly would the > > logic in the handler be? I’m not very imaginative, so I’m assuming they’d > > mostly be if-this-then-that. However, if they’re more complicated, I’d > have > > other concerns. > > > > Thanks, > > Kirk > > > > > On Apr 22, 2024, at 7:38 AM, Alieh Saeedi <asae...@confluent.io.INVALID > > > > wrote: > > > > > > Thank you all for the feedback! > > > > > > Addressing the main concern: The KIP is about giving the user the > ability > > > to handle producer exceptions, but to be more conservative and avoid > > future > > > issues, we decided to be limited to a short list of exceptions. I > > included > > > *RecordTooLargeExceptin* and *UnknownTopicOrPartitionException. *Open > to > > > suggestion for adding some more ;-) > > > > > > KIP Updates: > > > - clarified the way that the user should configure the Producer to use > > the > > > custom handler. I think adding a producer config property is the > cleanest > > > one. > > > - changed the *ClientExceptionHandler* to *ProducerExceptionHandler* to > > be > > > closer to what we are changing. > > > - added the ProducerRecord as the input parameter of the handle() > method > > as > > > well. > > > - increased the response types to 3 to have fail and two types of > > continue. > > > - The default behaviour is having no custom handler, having the > > > corresponding config parameter set to null. Therefore, the KIP provides > > no > > > default implementation of the interface. > > > - We follow the interface solution as described in the > > > Rejected Alternetives section. > > > > > > > > > Cheers, > > > Alieh > > > > > > > > > On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax <mj...@apache.org> > > wrote: > > > > > >> Thanks for the KIP Alieh! It addresses an important case for error > > >> handling. > > >> > > >> I agree that using this handler would be an expert API, as mentioned > by > > >> a few people. But I don't think it would be a reason to not add it. > It's > > >> always a tricky tradeoff what to expose to users and to avoid foot > guns, > > >> but we added similar handlers to Kafka Streams, and have good > experience > > >> with it. Hence, I understand, but don't share the concern raised. > > >> > > >> I also agree that there is some responsibility by the user to > understand > > >> how such a handler should be implemented to not drop data by accident. > > >> But it seem unavoidable and acceptable. > > >> > > >> While I understand that a "simpler / reduced" API (eg via configs) > might > > >> also work, I personally prefer a full handler. Configs have the same > > >> issue that they could be miss-used potentially leading to incorrectly > > >> dropped data, but at the same time are less flexible (and thus maybe > > >> ever harder to use correctly...?). Base on my experience, there is > also > > >> often weird corner case for which it make sense to also drop records > for > > >> other exceptions, and a full handler has the advantage of full > > >> flexibility and "absolute power!". > > >> > > >> To be fair: I don't know the exact code paths of the producer in > > >> details, so please keep me honest. But my understanding is, that the > KIP > > >> aims to allow users to react to internal exception, and decide to keep > > >> retrying internally, swallow the error and drop the record, or raise > the > > >> error? > > >> > > >> Maybe the KIP would need to be a little bit more precises what error > we > > >> want to cover -- I don't think this list must be exhaustive, as we can > > >> always do follow up KIP to also apply the handler to other errors to > > >> expand the scope of the handler. The KIP does mention examples, but it > > >> might be good to explicitly state for what cases the handler gets > > applied? > > >> > > >> I am also not sure if CONTINUE and FAIL are enough options? Don't we > > >> need three options? Or would `CONTINUE` have different meaning > depending > > >> on the type of error? Ie, for a retryable error `CONTINUE` would mean > > >> keep retrying internally, but for a non-retryable error `CONTINUE` > means > > >> swallow the error and drop the record? This semantic overload seems > > >> tricky to reason about by users, so it might better to split > `CONTINUE` > > >> into two cases -> `RETRY` and `SWALLOW` (or some better names). > > >> > > >> Additionally, should we just ship a `DefaultClientExceptionHandler` > > >> which would return `FAIL`, for backward compatibility. Or don't have > any > > >> default handler to begin with and allow it to be `null`? I don't see > the > > >> need for a specific `TransactionExceptionHandler`. To me, the goal > > >> should be to not modify the default behavior at all, but to just allow > > >> users to change the default behavior if there is a need. > > >> > > >> What is missing on the KIP though it, how the handler is passed into > the > > >> producer thought? Would we need a new config which allows to set a > > >> custom handler? And/or would we allow to pass in an instance via the > > >> constructor or add a new method to set a handler? > > >> > > >> > > >> -Matthias > > >> > > >> On 4/18/24 10:02 AM, Andrew Schofield wrote: > > >>> Hi Alieh, > > >>> Thanks for the KIP. > > >>> > > >>> Exception handling in the Kafka producer and consumer is really not > > >> ideal. > > >>> It’s even harder working out what’s going on with the consumer. > > >>> > > >>> I’m a bit nervous about this KIP and I agree with Chris that it could > > do > > >> with additional > > >>> motivation. This would be an expert-level interface given how > > complicated > > >>> the exception handling for Kafka has become. > > >>> > > >>> 7. The application is not really aware of the batching being done on > > its > > >> behalf. > > >>> The ProduceResponse can actually return an array of records which > > failed > > >>> per batch. If you get RecordTooLargeException, and want to retry, you > > >> probably > > >>> need to remove the offending records from the batch and retry it. > This > > >> is getting fiddly. > > >>> > > >>> 8. There is already o.a.k.clients.producer.Callback. I wonder whether > > an > > >>> alternative might be to add a method to the existing Callback > > interface, > > >> such as: > > >>> > > >>> ClientExceptionResponse onException(Exception exception) > > >>> > > >>> It would be called when a ProduceResponse contains an error, but the > > >>> producer is going to retry. It tells the producer whether to go ahead > > >> with the retry > > >>> or not. The default implementation would be to CONTINUE, because > that’s > > >>> just continuing to retry as planned. Note that this is a per-record > > >> callback, so > > >>> the application would be able to understand which records failed. > > >>> > > >>> By using an existing interface, we already know how to configure it > and > > >> we know > > >>> about the threading model for calling it. > > >>> > > >>> > > >>> Thanks, > > >>> Andrew > > >>> > > >>> > > >>> > > >>>> On 17 Apr 2024, at 18:17, Chris Egerton <chr...@aiven.io.INVALID> > > >> wrote: > > >>>> > > >>>> Hi Alieh, > > >>>> > > >>>> Thanks for the KIP! The issue with writing to non-existent topics is > > >>>> particularly frustrating for users of Kafka Connect and has been the > > >> source > > >>>> of a handful of Jira tickets over the years. My thoughts: > > >>>> > > >>>> 1. An additional detail we can add to the motivation (or possibly > > >> rejected > > >>>> alternatives) section is that this kind of custom retry logic can't > be > > >>>> implemented by hand by, e.g., setting retries to 0 in the producer > > >> config > > >>>> and handling exceptions at the application level. Or rather, it can, > > >> but 1) > > >>>> it's a bit painful to have to reimplement at every call-site for > > >>>> Producer::send (and any code that awaits the returned Future) and 2) > > >> it's > > >>>> impossible to do this without losing idempotency on retries. > > >>>> > > >>>> 2. That said, I wonder if a pluggable interface is really the right > > call > > >>>> here. Documenting the interactions of a producer with > > >>>> a ClientExceptionHandler instance will be tricky, and implementing > > them > > >>>> will also be a fair amount of work. I believe that there needs to be > > >> some > > >>>> more granularity for how writes to non-existent topics (or really, > > >>>> UNKNOWN_TOPIC_OR_PARTITION and related errors from the broker) are > > >> handled, > > >>>> but I'm torn between keeping it simple with maybe one or two new > > >> producer > > >>>> config properties, or a full-blown pluggable interface. If there are > > >> more > > >>>> cases that would benefit from a pluggable interface, it would be > nice > > to > > >>>> identify these and add them to the KIP to strengthen the motivation. > > >> Right > > >>>> now, I'm not sure the two that are mentioned in the motivation are > > >>>> sufficient. > > >>>> > > >>>> 3. Alternatively, a possible compromise is for this KIP to introduce > > new > > >>>> properties that dictate how to handle unknown-topic-partition and > > >>>> record-too-large errors, with the thinking that if we introduce a > > >> pluggable > > >>>> interface later on, these properties will be recognized by the > default > > >>>> implementation of that interface but could be completely ignored or > > >>>> replaced by alternative implementations. > > >>>> > > >>>> 4. (Nit) You can remove the "This page is meant as a template for > > >> writing a > > >>>> KIP..." part from the KIP. It's not a template anymore :) > > >>>> > > >>>> 5. If we do go the pluggable interface route, wouldn't we want to > add > > >> the > > >>>> possibility for retry logic? The simplest version of this could be > to > > >> add a > > >>>> RETRY value to the ClientExceptionHandlerResponse enum. > > >>>> > > >>>> 6. I think "SKIP" or "DROP" might be clearer instead of "CONTINUE" > for > > >>>> the ClientExceptionHandlerResponse enum, since they cause records to > > be > > >>>> dropped. > > >>>> > > >>>> Cheers, > > >>>> > > >>>> Chris > > >>>> > > >>>> On Wed, Apr 17, 2024 at 12:25 PM Justine Olshan > > >>>> <jols...@confluent.io.invalid> wrote: > > >>>> > > >>>>> Hey Alieh, > > >>>>> > > >>>>> I echo what Omnia says, I'm not sure I understand the implications > of > > >> the > > >>>>> change and I think more detail is needed. > > >>>>> > > >>>>> This comment also confused me a bit: > > >>>>> * {@code ClientExceptionHandler} that continues the transaction > even > > >> if a > > >>>>> record is too large. > > >>>>> * Otherwise, it makes the transaction to fail. > > >>>>> > > >>>>> Relatedly, I've been working with some folks on a KIP for > > transactions > > >>>>> errors and how they are handled. Specifically for the > > >>>>> RecordTooLargeException (and a few other errors), we want to give a > > new > > >>>>> error category for this error that allows the application to choose > > >> how it > > >>>>> is handled. Maybe this KIP is something that you are looking for? > > Stay > > >>>>> tuned :) > > >>>>> > > >>>>> Justine > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> On Wed, Apr 17, 2024 at 8:03 AM Omnia Ibrahim < > > o.g.h.ibra...@gmail.com > > >>> > > >>>>> wrote: > > >>>>> > > >>>>>> Hi Alieh, > > >>>>>> Thanks for the KIP! I have couple of comments > > >>>>>> - You mentioned in the KIP motivation, > > >>>>>>> Another example for which a production exception handler could be > > >>>>> useful > > >>>>>> is if a user tries to write into a non-existing topic, which > > returns a > > >>>>>> retryable error code; with infinite retries, the producer would > hang > > >>>>>> retrying forever. A handler could help to break the infinite retry > > >> loop. > > >>>>>> > > >>>>>> How the handler can differentiate between something that is > > temporary > > >> and > > >>>>>> it should keep retrying and something permanent like forgot to > > create > > >> the > > >>>>>> topic? temporary here could be > > >>>>>> the producer get deployed before the topic creation finish > > (specially > > >> if > > >>>>>> the topic creation is handled via IaC) > > >>>>>> temporary offline partitions > > >>>>>> leadership changing > > >>>>>> Isn’t this putting the producer at risk of dropping records > > >>>>>> unintentionally? > > >>>>>> - Can you elaborate more on what is written in the compatibility / > > >>>>>> migration plan section please by explaining in bit more details > what > > >> is > > >>>>> the > > >>>>>> changing behaviour and how this will impact client who are > > upgrading? > > >>>>>> - In the proposal changes can you elaborate in the KIP where in > the > > >>>>>> producer lifecycle will ClientExceptionHandler and > > >>>>>> TransactionExceptionHandler get triggered, and how will the > producer > > >>>>>> configure them to point to costumed implementation. > > >>>>>> > > >>>>>> Thanks > > >>>>>> Omnia > > >>>>>> > > >>>>>>> On 17 Apr 2024, at 13:13, Alieh Saeedi > > <asae...@confluent.io.INVALID > > >>> > > >>>>>> wrote: > > >>>>>>> > > >>>>>>> Hi all, > > >>>>>>> Here is the KIP-1038: Add Custom Error Handler to Producer. > > >>>>>>> < > > >>>>>> > > >>>>> > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1038%3A+Add+Custom+Error+Handler+to+Producer > > >>>>>>> > > >>>>>>> I look forward to your feedback! > > >>>>>>> > > >>>>>>> Cheers, > > >>>>>>> Alieh > > >>>>>> > > >>>>>> > > >>>>> > > >>> > > >> > > > > >