Alieh, Having run into issues with not being able to handle producer failures, I think this is good functionality to have.
With this new functionality proposed at the Producer level, how would ecosystems that sit on top of it function? Specifically, Connect was updated a few years ago to allow Source Connect Workers to handle producer exceptions that would never succeed when the source data was bad. Knowles On Tue, Apr 23, 2024 at 5:23 AM Alieh Saeedi <asae...@confluent.io.invalid> wrote: > Thanks Matthias. I changed it to `custom.exception.handler` > > Alieh > > > On Tue, Apr 23, 2024 at 8:47 AM Matthias J. Sax <mj...@apache.org> wrote: > > > Thanks Alieh! > > > > A few nits: > > > > > > 1) The new config we add for the producer should be mentioned in the > > "Public Interfaces" section. > > > > 2) Why do we use `producer.` prefix for a *producer* config? Should it > > be `exception.handler` only? > > > > > > -Matthias > > > > On 4/22/24 7:38 AM, Alieh Saeedi wrote: > > > Thank you all for the feedback! > > > > > > Addressing the main concern: The KIP is about giving the user the > ability > > > to handle producer exceptions, but to be more conservative and avoid > > future > > > issues, we decided to be limited to a short list of exceptions. I > > included > > > *RecordTooLargeExceptin* and *UnknownTopicOrPartitionException. *Open > to > > > suggestion for adding some more ;-) > > > > > > KIP Updates: > > > - clarified the way that the user should configure the Producer to use > > the > > > custom handler. I think adding a producer config property is the > cleanest > > > one. > > > - changed the *ClientExceptionHandler* to *ProducerExceptionHandler* to > > be > > > closer to what we are changing. > > > - added the ProducerRecord as the input parameter of the handle() > method > > as > > > well. > > > - increased the response types to 3 to have fail and two types of > > continue. > > > - The default behaviour is having no custom handler, having the > > > corresponding config parameter set to null. Therefore, the KIP provides > > no > > > default implementation of the interface. > > > - We follow the interface solution as described in the > > > Rejected Alternetives section. > > > > > > > > > Cheers, > > > Alieh > > > > > > > > > On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax <mj...@apache.org> > > wrote: > > > > > >> Thanks for the KIP Alieh! It addresses an important case for error > > >> handling. > > >> > > >> I agree that using this handler would be an expert API, as mentioned > by > > >> a few people. But I don't think it would be a reason to not add it. > It's > > >> always a tricky tradeoff what to expose to users and to avoid foot > guns, > > >> but we added similar handlers to Kafka Streams, and have good > experience > > >> with it. Hence, I understand, but don't share the concern raised. > > >> > > >> I also agree that there is some responsibility by the user to > understand > > >> how such a handler should be implemented to not drop data by accident. > > >> But it seem unavoidable and acceptable. > > >> > > >> While I understand that a "simpler / reduced" API (eg via configs) > might > > >> also work, I personally prefer a full handler. Configs have the same > > >> issue that they could be miss-used potentially leading to incorrectly > > >> dropped data, but at the same time are less flexible (and thus maybe > > >> ever harder to use correctly...?). Base on my experience, there is > also > > >> often weird corner case for which it make sense to also drop records > for > > >> other exceptions, and a full handler has the advantage of full > > >> flexibility and "absolute power!". > > >> > > >> To be fair: I don't know the exact code paths of the producer in > > >> details, so please keep me honest. But my understanding is, that the > KIP > > >> aims to allow users to react to internal exception, and decide to keep > > >> retrying internally, swallow the error and drop the record, or raise > the > > >> error? > > >> > > >> Maybe the KIP would need to be a little bit more precises what error > we > > >> want to cover -- I don't think this list must be exhaustive, as we can > > >> always do follow up KIP to also apply the handler to other errors to > > >> expand the scope of the handler. The KIP does mention examples, but it > > >> might be good to explicitly state for what cases the handler gets > > applied? > > >> > > >> I am also not sure if CONTINUE and FAIL are enough options? Don't we > > >> need three options? Or would `CONTINUE` have different meaning > depending > > >> on the type of error? Ie, for a retryable error `CONTINUE` would mean > > >> keep retrying internally, but for a non-retryable error `CONTINUE` > means > > >> swallow the error and drop the record? This semantic overload seems > > >> tricky to reason about by users, so it might better to split > `CONTINUE` > > >> into two cases -> `RETRY` and `SWALLOW` (or some better names). > > >> > > >> Additionally, should we just ship a `DefaultClientExceptionHandler` > > >> which would return `FAIL`, for backward compatibility. Or don't have > any > > >> default handler to begin with and allow it to be `null`? I don't see > the > > >> need for a specific `TransactionExceptionHandler`. To me, the goal > > >> should be to not modify the default behavior at all, but to just allow > > >> users to change the default behavior if there is a need. > > >> > > >> What is missing on the KIP though it, how the handler is passed into > the > > >> producer thought? Would we need a new config which allows to set a > > >> custom handler? And/or would we allow to pass in an instance via the > > >> constructor or add a new method to set a handler? > > >> > > >> > > >> -Matthias > > >> > > >> On 4/18/24 10:02 AM, Andrew Schofield wrote: > > >>> Hi Alieh, > > >>> Thanks for the KIP. > > >>> > > >>> Exception handling in the Kafka producer and consumer is really not > > >> ideal. > > >>> It’s even harder working out what’s going on with the consumer. > > >>> > > >>> I’m a bit nervous about this KIP and I agree with Chris that it could > > do > > >> with additional > > >>> motivation. This would be an expert-level interface given how > > complicated > > >>> the exception handling for Kafka has become. > > >>> > > >>> 7. The application is not really aware of the batching being done on > > its > > >> behalf. > > >>> The ProduceResponse can actually return an array of records which > > failed > > >>> per batch. If you get RecordTooLargeException, and want to retry, you > > >> probably > > >>> need to remove the offending records from the batch and retry it. > This > > >> is getting fiddly. > > >>> > > >>> 8. There is already o.a.k.clients.producer.Callback. I wonder whether > > an > > >>> alternative might be to add a method to the existing Callback > > interface, > > >> such as: > > >>> > > >>> ClientExceptionResponse onException(Exception exception) > > >>> > > >>> It would be called when a ProduceResponse contains an error, but the > > >>> producer is going to retry. It tells the producer whether to go ahead > > >> with the retry > > >>> or not. The default implementation would be to CONTINUE, because > that’s > > >>> just continuing to retry as planned. Note that this is a per-record > > >> callback, so > > >>> the application would be able to understand which records failed. > > >>> > > >>> By using an existing interface, we already know how to configure it > and > > >> we know > > >>> about the threading model for calling it. > > >>> > > >>> > > >>> Thanks, > > >>> Andrew > > >>> > > >>> > > >>> > > >>>> On 17 Apr 2024, at 18:17, Chris Egerton <chr...@aiven.io.INVALID> > > >> wrote: > > >>>> > > >>>> Hi Alieh, > > >>>> > > >>>> Thanks for the KIP! The issue with writing to non-existent topics is > > >>>> particularly frustrating for users of Kafka Connect and has been the > > >> source > > >>>> of a handful of Jira tickets over the years. My thoughts: > > >>>> > > >>>> 1. An additional detail we can add to the motivation (or possibly > > >> rejected > > >>>> alternatives) section is that this kind of custom retry logic can't > be > > >>>> implemented by hand by, e.g., setting retries to 0 in the producer > > >> config > > >>>> and handling exceptions at the application level. Or rather, it can, > > >> but 1) > > >>>> it's a bit painful to have to reimplement at every call-site for > > >>>> Producer::send (and any code that awaits the returned Future) and 2) > > >> it's > > >>>> impossible to do this without losing idempotency on retries. > > >>>> > > >>>> 2. That said, I wonder if a pluggable interface is really the right > > call > > >>>> here. Documenting the interactions of a producer with > > >>>> a ClientExceptionHandler instance will be tricky, and implementing > > them > > >>>> will also be a fair amount of work. I believe that there needs to be > > >> some > > >>>> more granularity for how writes to non-existent topics (or really, > > >>>> UNKNOWN_TOPIC_OR_PARTITION and related errors from the broker) are > > >> handled, > > >>>> but I'm torn between keeping it simple with maybe one or two new > > >> producer > > >>>> config properties, or a full-blown pluggable interface. If there are > > >> more > > >>>> cases that would benefit from a pluggable interface, it would be > nice > > to > > >>>> identify these and add them to the KIP to strengthen the motivation. > > >> Right > > >>>> now, I'm not sure the two that are mentioned in the motivation are > > >>>> sufficient. > > >>>> > > >>>> 3. Alternatively, a possible compromise is for this KIP to introduce > > new > > >>>> properties that dictate how to handle unknown-topic-partition and > > >>>> record-too-large errors, with the thinking that if we introduce a > > >> pluggable > > >>>> interface later on, these properties will be recognized by the > default > > >>>> implementation of that interface but could be completely ignored or > > >>>> replaced by alternative implementations. > > >>>> > > >>>> 4. (Nit) You can remove the "This page is meant as a template for > > >> writing a > > >>>> KIP..." part from the KIP. It's not a template anymore :) > > >>>> > > >>>> 5. If we do go the pluggable interface route, wouldn't we want to > add > > >> the > > >>>> possibility for retry logic? The simplest version of this could be > to > > >> add a > > >>>> RETRY value to the ClientExceptionHandlerResponse enum. > > >>>> > > >>>> 6. I think "SKIP" or "DROP" might be clearer instead of "CONTINUE" > for > > >>>> the ClientExceptionHandlerResponse enum, since they cause records to > > be > > >>>> dropped. > > >>>> > > >>>> Cheers, > > >>>> > > >>>> Chris > > >>>> > > >>>> On Wed, Apr 17, 2024 at 12:25 PM Justine Olshan > > >>>> <jols...@confluent.io.invalid> wrote: > > >>>> > > >>>>> Hey Alieh, > > >>>>> > > >>>>> I echo what Omnia says, I'm not sure I understand the implications > of > > >> the > > >>>>> change and I think more detail is needed. > > >>>>> > > >>>>> This comment also confused me a bit: > > >>>>> * {@code ClientExceptionHandler} that continues the transaction > even > > >> if a > > >>>>> record is too large. > > >>>>> * Otherwise, it makes the transaction to fail. > > >>>>> > > >>>>> Relatedly, I've been working with some folks on a KIP for > > transactions > > >>>>> errors and how they are handled. Specifically for the > > >>>>> RecordTooLargeException (and a few other errors), we want to give a > > new > > >>>>> error category for this error that allows the application to choose > > >> how it > > >>>>> is handled. Maybe this KIP is something that you are looking for? > > Stay > > >>>>> tuned :) > > >>>>> > > >>>>> Justine > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> On Wed, Apr 17, 2024 at 8:03 AM Omnia Ibrahim < > > o.g.h.ibra...@gmail.com > > >>> > > >>>>> wrote: > > >>>>> > > >>>>>> Hi Alieh, > > >>>>>> Thanks for the KIP! I have couple of comments > > >>>>>> - You mentioned in the KIP motivation, > > >>>>>>> Another example for which a production exception handler could be > > >>>>> useful > > >>>>>> is if a user tries to write into a non-existing topic, which > > returns a > > >>>>>> retryable error code; with infinite retries, the producer would > hang > > >>>>>> retrying forever. A handler could help to break the infinite retry > > >> loop. > > >>>>>> > > >>>>>> How the handler can differentiate between something that is > > temporary > > >> and > > >>>>>> it should keep retrying and something permanent like forgot to > > create > > >> the > > >>>>>> topic? temporary here could be > > >>>>>> the producer get deployed before the topic creation finish > > (specially > > >> if > > >>>>>> the topic creation is handled via IaC) > > >>>>>> temporary offline partitions > > >>>>>> leadership changing > > >>>>>> Isn’t this putting the producer at risk of dropping > records > > >>>>>> unintentionally? > > >>>>>> - Can you elaborate more on what is written in the compatibility / > > >>>>>> migration plan section please by explaining in bit more details > what > > >> is > > >>>>> the > > >>>>>> changing behaviour and how this will impact client who are > > upgrading? > > >>>>>> - In the proposal changes can you elaborate in the KIP where in > the > > >>>>>> producer lifecycle will ClientExceptionHandler and > > >>>>>> TransactionExceptionHandler get triggered, and how will the > producer > > >>>>>> configure them to point to costumed implementation. > > >>>>>> > > >>>>>> Thanks > > >>>>>> Omnia > > >>>>>> > > >>>>>>> On 17 Apr 2024, at 13:13, Alieh Saeedi > > <asae...@confluent.io.INVALID > > >>> > > >>>>>> wrote: > > >>>>>>> > > >>>>>>> Hi all, > > >>>>>>> Here is the KIP-1038: Add Custom Error Handler to Producer. > > >>>>>>> < > > >>>>>> > > >>>>> > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1038%3A+Add+Custom+Error+Handler+to+Producer > > >>>>>>> > > >>>>>>> I look forward to your feedback! > > >>>>>>> > > >>>>>>> Cheers, > > >>>>>>> Alieh > > >>>>>> > > >>>>>> > > >>>>> > > >>> > > >> > > > > > >