Hi Alieh, Thanks for the updates!
Comments inline... > On Apr 25, 2024, at 1:10 PM, Alieh Saeedi <asae...@confluent.io.INVALID> > wrote: > > Hi all, > > Thanks a lot for the constructive feedbacks! > > > > Addressing some of the main concerns: > > > - The `RecordTooLargeException` can be thrown by broker, producer and > consumer. Of course, the `ProducerExceptionHandler` interface is introduced > to affect only the exceptions thrown from the producer. This KIP very > specifically means to provide a possibility to manage the > `RecordTooLargeException` thrown from the Producer.send() method. Please > see “Proposed Changes” section for more clarity. I investigated the issue > there thoroughly. I hope it can explain the concern about how we handle the > errors as well. > > > > - The problem with Callback: Methods of Callback are called when the record > sent to the server is acknowledged, while this is not the desired time for > all exceptions. We intend to handle exceptions beforehand. I guess it makes sense to keep the expectation for when Callback is invoked as-is vs. shoehorning more into it. > - What if the custom handler returns RETRY for `RecordTooLargeException`? I > assume changing the producer configuration at runtime is possible. If so, > RETRY for a too large record is valid because maybe in the next try, the > too large record is not poisoning any more. I am not 100% sure about the > technical details, though. Otherwise, we can consider the RETRY as FAIL for > this exception. Another solution would be to consider a constant number of > times for RETRY which can be useful for other exceptions as well. It’s not presently possible to change the configuration of an existing Producer at runtime. So if a record hits a RecordTooLargeException once, no amount of retrying (with the current Producer) will change that fact. So I’m still a little stuck on how to handle a response of RETRY for an “oversized” record. > - What if the handle() method itself throws an exception? I think > rationally and pragmatically, the behaviour must be exactly like when no > custom handler is defined since the user actually did not have a working > handler. I’m not convinced that ignoring an errant handler is the right choice. It then becomes a silent failure that might have repercussions, depending on the business logic. A user would have to proactively trawls through the logs for WARN/ERROR messages to catch it. Throwing a hard error is pretty draconian, though… > - Why not use config parameters instead of an interface? As explained in > the “Rejected Alternatives” section, we assume that the handler will be > used for a greater number of exceptions in the future. Defining a > configuration parameter for each exception may make the configuration a bit > messy. Moreover, the handler offers more flexibility. Agreed that the logic-via-configuration approach is weird and limiting. Forget I ever suggested it ;) I’d think additional background in the Motivation section would help me understand how users might use this feature beyond a) skipping “oversized” records, and b) not retrying missing topics. > Small change: > > -ProductionExceptionHandlerResponse -> Response for brevity and simplicity. > Could’ve been HandlerResponse too I think! The name change sounds good to me. Thanks Alieh! > > > I thank you all again for your useful questions/suggestions. > > I would be happy to hear more of your concerns, as stated in some feedback. > > Cheers, > Alieh > > On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan > <jols...@confluent.io.invalid> wrote: > >> Thanks Alieh for the updates. >> >> I'm a little concerned about the design pattern here. It seems like we want >> specific usages, but we are packaging it as a generic handler. >> I think we tried to narrow down on the specific errors we want to handle, >> but it feels a little clunky as we have a generic thing for two specific >> errors. >> >> I'm wondering if we are using the right patterns to solve these problems. I >> agree though that we will need something more than the error classes I'm >> proposing if we want to have different handling be configurable. >> My concern is that the open-endedness of a handler means that we are >> creating more problems than we are solving. It is still unclear to me how >> we expect to handle the errors. Perhaps we could include an example? It >> seems like there is a specific use case in mind and maybe we can make a >> design that is tighter and supports that case. >> >> Justine >> >> On Tue, Apr 23, 2024 at 3:06 PM Kirk True <k...@kirktrue.pro> wrote: >> >>> Hi Alieh, >>> >>> Thanks for the KIP! >>> >>> A few questions: >>> >>> K1. What is the expected behavior for the producer if it generates a >>> RecordTooLargeException, but the handler returns RETRY? >>> K2. How do we determine which Record was responsible for the >>> UnknownTopicOrPartitionException since we get that response when >> sending a >>> batch of records? >>> K3. What is the expected behavior if the handle() method itself throws an >>> error? >>> K4. What is the downside of adding an onError() method to the Producer’s >>> Callback interface vs. a new mechanism? >>> K5. Can we change “ProducerExceptionHandlerResponse" to just “Response” >>> given that it’s an inner enum? >>> K6. Any recommendation for callback authors to handle different behavior >>> for different topics? >>> >>> I’ll echo what others have said, it would help me understand why we want >>> another handler class if there were more examples in the Motivation >>> section. As it stands now, I agree with Chris that the stated issues >> could >>> be solved by adding two new configuration options: >>> >>> oversized.record.behavior=fail >>> retry.on.unknown.topic.or.partition=true >>> >>> What I’m not yet able to wrap my head around is: what exactly would the >>> logic in the handler be? I’m not very imaginative, so I’m assuming they’d >>> mostly be if-this-then-that. However, if they’re more complicated, I’d >> have >>> other concerns. >>> >>> Thanks, >>> Kirk >>> >>>> On Apr 22, 2024, at 7:38 AM, Alieh Saeedi <asae...@confluent.io.INVALID >>> >>> wrote: >>>> >>>> Thank you all for the feedback! >>>> >>>> Addressing the main concern: The KIP is about giving the user the >> ability >>>> to handle producer exceptions, but to be more conservative and avoid >>> future >>>> issues, we decided to be limited to a short list of exceptions. I >>> included >>>> *RecordTooLargeExceptin* and *UnknownTopicOrPartitionException. *Open >> to >>>> suggestion for adding some more ;-) >>>> >>>> KIP Updates: >>>> - clarified the way that the user should configure the Producer to use >>> the >>>> custom handler. I think adding a producer config property is the >> cleanest >>>> one. >>>> - changed the *ClientExceptionHandler* to *ProducerExceptionHandler* to >>> be >>>> closer to what we are changing. >>>> - added the ProducerRecord as the input parameter of the handle() >> method >>> as >>>> well. >>>> - increased the response types to 3 to have fail and two types of >>> continue. >>>> - The default behaviour is having no custom handler, having the >>>> corresponding config parameter set to null. Therefore, the KIP provides >>> no >>>> default implementation of the interface. >>>> - We follow the interface solution as described in the >>>> Rejected Alternetives section. >>>> >>>> >>>> Cheers, >>>> Alieh >>>> >>>> >>>> On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax <mj...@apache.org> >>> wrote: >>>> >>>>> Thanks for the KIP Alieh! It addresses an important case for error >>>>> handling. >>>>> >>>>> I agree that using this handler would be an expert API, as mentioned >> by >>>>> a few people. But I don't think it would be a reason to not add it. >> It's >>>>> always a tricky tradeoff what to expose to users and to avoid foot >> guns, >>>>> but we added similar handlers to Kafka Streams, and have good >> experience >>>>> with it. Hence, I understand, but don't share the concern raised. >>>>> >>>>> I also agree that there is some responsibility by the user to >> understand >>>>> how such a handler should be implemented to not drop data by accident. >>>>> But it seem unavoidable and acceptable. >>>>> >>>>> While I understand that a "simpler / reduced" API (eg via configs) >> might >>>>> also work, I personally prefer a full handler. Configs have the same >>>>> issue that they could be miss-used potentially leading to incorrectly >>>>> dropped data, but at the same time are less flexible (and thus maybe >>>>> ever harder to use correctly...?). Base on my experience, there is >> also >>>>> often weird corner case for which it make sense to also drop records >> for >>>>> other exceptions, and a full handler has the advantage of full >>>>> flexibility and "absolute power!". >>>>> >>>>> To be fair: I don't know the exact code paths of the producer in >>>>> details, so please keep me honest. But my understanding is, that the >> KIP >>>>> aims to allow users to react to internal exception, and decide to keep >>>>> retrying internally, swallow the error and drop the record, or raise >> the >>>>> error? >>>>> >>>>> Maybe the KIP would need to be a little bit more precises what error >> we >>>>> want to cover -- I don't think this list must be exhaustive, as we can >>>>> always do follow up KIP to also apply the handler to other errors to >>>>> expand the scope of the handler. The KIP does mention examples, but it >>>>> might be good to explicitly state for what cases the handler gets >>> applied? >>>>> >>>>> I am also not sure if CONTINUE and FAIL are enough options? Don't we >>>>> need three options? Or would `CONTINUE` have different meaning >> depending >>>>> on the type of error? Ie, for a retryable error `CONTINUE` would mean >>>>> keep retrying internally, but for a non-retryable error `CONTINUE` >> means >>>>> swallow the error and drop the record? This semantic overload seems >>>>> tricky to reason about by users, so it might better to split >> `CONTINUE` >>>>> into two cases -> `RETRY` and `SWALLOW` (or some better names). >>>>> >>>>> Additionally, should we just ship a `DefaultClientExceptionHandler` >>>>> which would return `FAIL`, for backward compatibility. Or don't have >> any >>>>> default handler to begin with and allow it to be `null`? I don't see >> the >>>>> need for a specific `TransactionExceptionHandler`. To me, the goal >>>>> should be to not modify the default behavior at all, but to just allow >>>>> users to change the default behavior if there is a need. >>>>> >>>>> What is missing on the KIP though it, how the handler is passed into >> the >>>>> producer thought? Would we need a new config which allows to set a >>>>> custom handler? And/or would we allow to pass in an instance via the >>>>> constructor or add a new method to set a handler? >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> On 4/18/24 10:02 AM, Andrew Schofield wrote: >>>>>> Hi Alieh, >>>>>> Thanks for the KIP. >>>>>> >>>>>> Exception handling in the Kafka producer and consumer is really not >>>>> ideal. >>>>>> It’s even harder working out what’s going on with the consumer. >>>>>> >>>>>> I’m a bit nervous about this KIP and I agree with Chris that it could >>> do >>>>> with additional >>>>>> motivation. This would be an expert-level interface given how >>> complicated >>>>>> the exception handling for Kafka has become. >>>>>> >>>>>> 7. The application is not really aware of the batching being done on >>> its >>>>> behalf. >>>>>> The ProduceResponse can actually return an array of records which >>> failed >>>>>> per batch. If you get RecordTooLargeException, and want to retry, you >>>>> probably >>>>>> need to remove the offending records from the batch and retry it. >> This >>>>> is getting fiddly. >>>>>> >>>>>> 8. There is already o.a.k.clients.producer.Callback. I wonder whether >>> an >>>>>> alternative might be to add a method to the existing Callback >>> interface, >>>>> such as: >>>>>> >>>>>> ClientExceptionResponse onException(Exception exception) >>>>>> >>>>>> It would be called when a ProduceResponse contains an error, but the >>>>>> producer is going to retry. It tells the producer whether to go ahead >>>>> with the retry >>>>>> or not. The default implementation would be to CONTINUE, because >> that’s >>>>>> just continuing to retry as planned. Note that this is a per-record >>>>> callback, so >>>>>> the application would be able to understand which records failed. >>>>>> >>>>>> By using an existing interface, we already know how to configure it >> and >>>>> we know >>>>>> about the threading model for calling it. >>>>>> >>>>>> >>>>>> Thanks, >>>>>> Andrew >>>>>> >>>>>> >>>>>> >>>>>>> On 17 Apr 2024, at 18:17, Chris Egerton <chr...@aiven.io.INVALID> >>>>> wrote: >>>>>>> >>>>>>> Hi Alieh, >>>>>>> >>>>>>> Thanks for the KIP! The issue with writing to non-existent topics is >>>>>>> particularly frustrating for users of Kafka Connect and has been the >>>>> source >>>>>>> of a handful of Jira tickets over the years. My thoughts: >>>>>>> >>>>>>> 1. An additional detail we can add to the motivation (or possibly >>>>> rejected >>>>>>> alternatives) section is that this kind of custom retry logic can't >> be >>>>>>> implemented by hand by, e.g., setting retries to 0 in the producer >>>>> config >>>>>>> and handling exceptions at the application level. Or rather, it can, >>>>> but 1) >>>>>>> it's a bit painful to have to reimplement at every call-site for >>>>>>> Producer::send (and any code that awaits the returned Future) and 2) >>>>> it's >>>>>>> impossible to do this without losing idempotency on retries. >>>>>>> >>>>>>> 2. That said, I wonder if a pluggable interface is really the right >>> call >>>>>>> here. Documenting the interactions of a producer with >>>>>>> a ClientExceptionHandler instance will be tricky, and implementing >>> them >>>>>>> will also be a fair amount of work. I believe that there needs to be >>>>> some >>>>>>> more granularity for how writes to non-existent topics (or really, >>>>>>> UNKNOWN_TOPIC_OR_PARTITION and related errors from the broker) are >>>>> handled, >>>>>>> but I'm torn between keeping it simple with maybe one or two new >>>>> producer >>>>>>> config properties, or a full-blown pluggable interface. If there are >>>>> more >>>>>>> cases that would benefit from a pluggable interface, it would be >> nice >>> to >>>>>>> identify these and add them to the KIP to strengthen the motivation. >>>>> Right >>>>>>> now, I'm not sure the two that are mentioned in the motivation are >>>>>>> sufficient. >>>>>>> >>>>>>> 3. Alternatively, a possible compromise is for this KIP to introduce >>> new >>>>>>> properties that dictate how to handle unknown-topic-partition and >>>>>>> record-too-large errors, with the thinking that if we introduce a >>>>> pluggable >>>>>>> interface later on, these properties will be recognized by the >> default >>>>>>> implementation of that interface but could be completely ignored or >>>>>>> replaced by alternative implementations. >>>>>>> >>>>>>> 4. (Nit) You can remove the "This page is meant as a template for >>>>> writing a >>>>>>> KIP..." part from the KIP. It's not a template anymore :) >>>>>>> >>>>>>> 5. If we do go the pluggable interface route, wouldn't we want to >> add >>>>> the >>>>>>> possibility for retry logic? The simplest version of this could be >> to >>>>> add a >>>>>>> RETRY value to the ClientExceptionHandlerResponse enum. >>>>>>> >>>>>>> 6. I think "SKIP" or "DROP" might be clearer instead of "CONTINUE" >> for >>>>>>> the ClientExceptionHandlerResponse enum, since they cause records to >>> be >>>>>>> dropped. >>>>>>> >>>>>>> Cheers, >>>>>>> >>>>>>> Chris >>>>>>> >>>>>>> On Wed, Apr 17, 2024 at 12:25 PM Justine Olshan >>>>>>> <jols...@confluent.io.invalid> wrote: >>>>>>> >>>>>>>> Hey Alieh, >>>>>>>> >>>>>>>> I echo what Omnia says, I'm not sure I understand the implications >> of >>>>> the >>>>>>>> change and I think more detail is needed. >>>>>>>> >>>>>>>> This comment also confused me a bit: >>>>>>>> * {@code ClientExceptionHandler} that continues the transaction >> even >>>>> if a >>>>>>>> record is too large. >>>>>>>> * Otherwise, it makes the transaction to fail. >>>>>>>> >>>>>>>> Relatedly, I've been working with some folks on a KIP for >>> transactions >>>>>>>> errors and how they are handled. Specifically for the >>>>>>>> RecordTooLargeException (and a few other errors), we want to give a >>> new >>>>>>>> error category for this error that allows the application to choose >>>>> how it >>>>>>>> is handled. Maybe this KIP is something that you are looking for? >>> Stay >>>>>>>> tuned :) >>>>>>>> >>>>>>>> Justine >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Apr 17, 2024 at 8:03 AM Omnia Ibrahim < >>> o.g.h.ibra...@gmail.com >>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Alieh, >>>>>>>>> Thanks for the KIP! I have couple of comments >>>>>>>>> - You mentioned in the KIP motivation, >>>>>>>>>> Another example for which a production exception handler could be >>>>>>>> useful >>>>>>>>> is if a user tries to write into a non-existing topic, which >>> returns a >>>>>>>>> retryable error code; with infinite retries, the producer would >> hang >>>>>>>>> retrying forever. A handler could help to break the infinite retry >>>>> loop. >>>>>>>>> >>>>>>>>> How the handler can differentiate between something that is >>> temporary >>>>> and >>>>>>>>> it should keep retrying and something permanent like forgot to >>> create >>>>> the >>>>>>>>> topic? temporary here could be >>>>>>>>> the producer get deployed before the topic creation finish >>> (specially >>>>> if >>>>>>>>> the topic creation is handled via IaC) >>>>>>>>> temporary offline partitions >>>>>>>>> leadership changing >>>>>>>>> Isn’t this putting the producer at risk of dropping records >>>>>>>>> unintentionally? >>>>>>>>> - Can you elaborate more on what is written in the compatibility / >>>>>>>>> migration plan section please by explaining in bit more details >> what >>>>> is >>>>>>>> the >>>>>>>>> changing behaviour and how this will impact client who are >>> upgrading? >>>>>>>>> - In the proposal changes can you elaborate in the KIP where in >> the >>>>>>>>> producer lifecycle will ClientExceptionHandler and >>>>>>>>> TransactionExceptionHandler get triggered, and how will the >> producer >>>>>>>>> configure them to point to costumed implementation. >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Omnia >>>>>>>>> >>>>>>>>>> On 17 Apr 2024, at 13:13, Alieh Saeedi >>> <asae...@confluent.io.INVALID >>>>>> >>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Hi all, >>>>>>>>>> Here is the KIP-1038: Add Custom Error Handler to Producer. >>>>>>>>>> < >>>>>>>>> >>>>>>>> >>>>> >>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1038%3A+Add+Custom+Error+Handler+to+Producer >>>>>>>>>> >>>>>>>>>> I look forward to your feedback! >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Alieh >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> >>> >>