Hi Alieh, Thank you for all the updates! One final question--how will the retry timeout for unknown topic partition errors be implemented? I think it would be best if this could be done with an implementation of the error handler, but I don't see a way to track the necessary information with the current ProducerExceptionHandler interface.
Cheers, Chris On Tue, May 14, 2024 at 9:10 AM Alieh Saeedi <asae...@confluent.io.invalid> wrote: > Thanks Andrew. Done :) > > @Chris: I changed the config parameter type from boolean to integer, which > defines the timeout for retrying. I thought reusing `max.block.ms` was not > reasonable as you mentioned. > > So if the KIP looks good, let 's skip to the good part ;-) VOTING :) > > Bests, > Alieh > > > > > > On Tue, May 14, 2024 at 12:26 PM Andrew Schofield < > andrew_schofi...@live.com> > wrote: > > > Hi Alieh, > > Just one final comment. > > > > [AJS5] Existing classes use Retriable, not Retryable. For example: > > > > > https://kafka.apache.org/21/javadoc/org/apache/kafka/common/errors/RetriableException.html > > > > I suggest RetriableResponse and NonRetriableResponse. > > > > Thanks, > > Andrew > > > > > On 13 May 2024, at 23:17, Alieh Saeedi <asae...@confluent.io.INVALID> > > wrote: > > > > > > Hi all, > > > > > > > > > Thanks for all the valid points you listed. > > > > > > > > > KIP updates and addressing concerns: > > > > > > > > > 1) The KIP now suggests two Response types: `RetryableResponse` and > > > `NonRetryableResponse` > > > > > > > > > 2) `custom.exception.handler` is changed to > > `custom.exception.handler.class` > > > > > > > > > 3) The KIP clarifies that `In the case of an implemented handler for > the > > > specified exception, the handler takes precedence.` > > > > > > > > > 4) There is now a `default` implementation for both handle() methods. > > > > > > > > > 5) @Chris: for `UnknownTopicOrPartition`, the default is already > > retrying > > > for 60s. (In fact, the default value of `max.block.ms`). If the > handler > > > instructs to FAIL or SWALLOW, there will be no retry, and if the > handler > > > instructs to RETRY, that will be the default behavior, which follows > the > > > values in already existing config parameters such as `max.block.ms`. > > Does > > > that make sense? > > > > > > > > > Hope the changes and explanations are convincing :) > > > > > > > > > Cheers, > > > > > > Alieh > > > > > > On Mon, May 13, 2024 at 6:40 PM Justine Olshan > > <jols...@confluent.io.invalid> > > > wrote: > > > > > >> Oh I see. The type isn't the error type but a newly defined type for > the > > >> response. Makes sense and works for me. > > >> > > >> Justine > > >> > > >> On Mon, May 13, 2024 at 9:13 AM Chris Egerton < > fearthecel...@gmail.com> > > >> wrote: > > >> > > >>> If we have dedicated methods for each kind of exception > > >>> (handleRecordTooLarge, handleUnknownTopicOrPartition, etc.), doesn't > > that > > >>> provide sufficient constraint? I'm not suggesting we eliminate these > > >>> methods, just that we change their return types to something more > > >> flexible. > > >>> > > >>> On Mon, May 13, 2024, 12:07 Justine Olshan > > <jols...@confluent.io.invalid > > >>> > > >>> wrote: > > >>> > > >>>> I'm not sure I agree with the Retriable and NonRetriableResponse > > >> comment. > > >>>> This doesn't limit the blast radius or enforce certain errors are > > used. > > >>>> I think we might disagree on how controlled these interfaces can > be... > > >>>> > > >>>> Justine > > >>>> > > >>>> On Mon, May 13, 2024 at 8:40 AM Chris Egerton > <chr...@aiven.io.invalid > > >>> > > >>>> wrote: > > >>>> > > >>>>> Hi Alieh, > > >>>>> > > >>>>> Thanks for the updates! I just have a few more thoughts: > > >>>>> > > >>>>> - I don't think a boolean property is sufficient to dictate retries > > >> for > > >>>>> unknown topic partitions, though. These errors can occur if a topic > > >> has > > >>>>> just been created, which can occur if, for example, automatic topic > > >>>>> creation is enabled for a multi-task connector. This is why I > > >> proposed > > >>> a > > >>>>> timeout instead of a boolean (and see my previous email for why > > >>> reducing > > >>>>> max.block.ms for a producer is not a viable alternative). If it > > >> helps, > > >>>> one > > >>>>> way to reproduce this yourself is to add the line > > >>>>> `fooProps.put(TASKS_MAX_CONFIG, "10");` to the integration test > here: > > >>>>> > > >>>>> > > >>>> > > >>> > > >> > > > https://github.com/apache/kafka/blob/5439914c32fa00d634efa7219699f1bc21add839/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java#L134 > > >>>>> and then check the logs afterward for messages like "Error while > > >>> fetching > > >>>>> metadata with correlation id <n> : > > >>>> {foo-topic=UNKNOWN_TOPIC_OR_PARTITION}". > > >>>>> > > >>>>> - I also don't think we need custom XxxResponse enums for every > > >>> possible > > >>>>> method; it seems like this will lead to a lot of duplication and > > >>>> cognitive > > >>>>> overhead if we want to expand the error handler in the future. > > >>> Something > > >>>>> more flexible like RetriableResponse and NonRetriableResponse could > > >>>>> suffice. > > >>>>> > > >>>>> - Finally, the KIP still doesn't state how the handler will or > won't > > >>> take > > >>>>> precedence over existing retry properties. If I set `retries` or ` > > >>>>> delivery.timeout.ms` or `max.block.ms` to low values, will that > > >> cause > > >>>>> retries to cease even if my custom handler would otherwise keep > > >>> returning > > >>>>> RETRY for an error? > > >>>>> > > >>>>> Cheers, > > >>>>> > > >>>>> Chris > > >>>>> > > >>>>> On Mon, May 13, 2024 at 11:02 AM Andrew Schofield < > > >>>>> andrew_schofi...@live.com> > > >>>>> wrote: > > >>>>> > > >>>>>> Hi Alieh, > > >>>>>> Just a few more comments on the KIP. It is looking much less risky > > >>> now > > >>>>> the > > >>>>>> scope > > >>>>>> is tighter. > > >>>>>> > > >>>>>> [AJS1] It would be nice to have default implementations of the > > >> handle > > >>>>>> methods > > >>>>>> so an implementor would not need to implement both themselves. > > >>>>>> > > >>>>>> [AJS2] Producer configurations which are class names usually end > in > > >>>>>> “.class”. > > >>>>>> I suggest “custom.exception.handler.class”. > > >>>>>> > > >>>>>> [AJS3] If I implemented a handler, and I set a non-default value > > >> for > > >>>> one > > >>>>>> of the > > >>>>>> new configuations, what happens? I would expect that the handler > > >>> takes > > >>>>>> precedence. I wasn’t quite clear what “the control will follow the > > >>>>> handler > > >>>>>> instructions” meant. > > >>>>>> > > >>>>>> [AJS4] Because you now have an enum for the > > >>>>>> RecordTooLargeExceptionResponse, > > >>>>>> I don’t think you need to state in the comment for > > >>>>>> ProducerExceptionHandler that > > >>>>>> RETRY will be interpreted as FAIL. > > >>>>>> > > >>>>>> Thanks, > > >>>>>> Andrew > > >>>>>> > > >>>>>>> On 13 May 2024, at 14:53, Alieh Saeedi > > >>> <asae...@confluent.io.INVALID > > >>>>> > > >>>>>> wrote: > > >>>>>>> > > >>>>>>> Hi all, > > >>>>>>> > > >>>>>>> > > >>>>>>> Thanks for the very interesting discussion during my PTO. > > >>>>>>> > > >>>>>>> > > >>>>>>> KIP updates and addressing concerns: > > >>>>>>> > > >>>>>>> > > >>>>>>> 1) Two handle() methods are defined in ProducerExceptionHandler > > >> for > > >>>> the > > >>>>>> two > > >>>>>>> exceptions with different input parameters so that we have > > >>>>>>> handle(RecordTooLargeException e, ProducerRecord record) and > > >>>>>>> handle(UnknownTopicOrPartitionException e, ProducerRecord record) > > >>>>>>> > > >>>>>>> > > >>>>>>> 2) The ProducerExceptionHandler extends `Closable` as well. > > >>>>>>> > > >>>>>>> > > >>>>>>> 3) The KIP suggests having two more configuration parameters with > > >>>>> boolean > > >>>>>>> values: > > >>>>>>> > > >>>>>>> - `drop.invalid.large.records` with a default value of `false` > > >> for > > >>>>>>> swallowing too large records. > > >>>>>>> > > >>>>>>> - `retry.unknown.topic.partition` with a default value of `true` > > >>> that > > >>>>>>> performs RETRY for `max.block.ms` ms, encountering the > > >>>>>>> UnknownTopicOrPartitionException. > > >>>>>>> > > >>>>>>> > > >>>>>>> Hope the main concerns are addressed so that we can go forward > > >> with > > >>>>>> voting. > > >>>>>>> > > >>>>>>> > > >>>>>>> Cheers, > > >>>>>>> > > >>>>>>> Alieh > > >>>>>>> > > >>>>>>> On Thu, May 9, 2024 at 11:25 PM Artem Livshits > > >>>>>>> <alivsh...@confluent.io.invalid> wrote: > > >>>>>>> > > >>>>>>>> Hi Mathias, > > >>>>>>>> > > >>>>>>>>> [AL1] While I see the point, I would think having a different > > >>>>> callback > > >>>>>>>> for every exception might not really be elegant? > > >>>>>>>> > > >>>>>>>> I'm not sure how to assess the level of elegance of the > > >> proposal, > > >>>> but > > >>>>> I > > >>>>>> can > > >>>>>>>> comment on the technical characteristics: > > >>>>>>>> > > >>>>>>>> 1. Having specific interfaces that codify the logic that is > > >>>> currently > > >>>>>>>> prescribed in the comments reduce the chance of making a > > >> mistake. > > >>>>>>>> Commments may get ignored, misuderstood or etc. but if the > > >>> contract > > >>>> is > > >>>>>>>> codified, the compilier will help to enforce the contract. > > >>>>>>>> 2. Given that the logic is trickier than it seems (the > > >>>>> record-too-large > > >>>>>> is > > >>>>>>>> an example that can easily confuse someone who's not intimately > > >>>>> familiar > > >>>>>>>> with the nuances of the batching logic), having a little more > > >>> hoops > > >>>> to > > >>>>>> jump > > >>>>>>>> would give a greater chance that whoever tries to add a new > > >> cases > > >>>>> pauses > > >>>>>>>> and thinks a bit more. > > >>>>>>>> 3. As Justine pointed out, having different method will be a > > >>> forcing > > >>>>>>>> function to go through a KIP rather than smuggle new cases > > >> through > > >>>>>>>> implementation. > > >>>>>>>> 4. Sort of a consequence of the previous 3 -- all those things > > >>>> reduce > > >>>>>> the > > >>>>>>>> chance of someone writing the code that works with 2 errors and > > >>> then > > >>>>>> when > > >>>>>>>> more errors are added in the future will suddenly incorrectly > > >>> ignore > > >>>>> new > > >>>>>>>> errors (the example I gave in the previous email). > > >>>>>>>> > > >>>>>>>>> [AL2 cont.] Similar to AL1, I see such a handler to some extend > > >>> as > > >>>>>>>> business logic. If a user puts a bad filter condition in their > > >> KS > > >>>> app, > > >>>>>> and > > >>>>>>>> drops messages > > >>>>>>>> > > >>>>>>>> I agree that there is always a chance to get a bug and lose > > >>>> messages, > > >>>>>> but > > >>>>>>>> there are generally separation of concerns that has different > > >> risk > > >>>>>> profile: > > >>>>>>>> the filtering logic may be more rigorously tested and rarely > > >>> changed > > >>>>>> (say > > >>>>>>>> an application developer does it), but setting the topics to > > >>> produce > > >>>>>> may be > > >>>>>>>> done via configuration (e.g. a user of the application does it) > > >>> and > > >>>>> it's > > >>>>>>>> generally an expectation that users would get an error when > > >>>>>> configuration > > >>>>>>>> is incorrect. > > >>>>>>>> > > >>>>>>>> What could be worse is that UnknownTopicOrPartitionException can > > >>> be > > >>>> an > > >>>>>>>> intermittent error, i.e. with a generally correct configuration, > > >>>> there > > >>>>>>>> could be metadata propagation problem on the cluster and then a > > >>>> random > > >>>>>> set > > >>>>>>>> of records could get lost. > > >>>>>>>> > > >>>>>>>>> [AL3] Maybe I misunderstand what you are saying, but to me, > > >>>> checking > > >>>>>> the > > >>>>>>>> size of the record upfront is exactly what the KIP proposes? No? > > >>>>>>>> > > >>>>>>>> It achieves the same result but solves it differently, my > > >>> proposal: > > >>>>>>>> > > >>>>>>>> 1. Application checks the validity of a record (maybe via a new > > >>>>>>>> validateRecord method) before producing it, and can just exclude > > >>> it > > >>>> or > > >>>>>>>> return an error to the user. > > >>>>>>>> 2. Application produces the record -- at this point there are no > > >>>>> records > > >>>>>>>> that could return record too large, they were either skipped at > > >>>> step 1 > > >>>>>> or > > >>>>>>>> we didn't get here because step 1 failed. > > >>>>>>>> > > >>>>>>>> Vs. KIP's proposal > > >>>>>>>> > > >>>>>>>> 1. Application produces the record. > > >>>>>>>> 2. Application gets a callback. > > >>>>>>>> 3. Application returns the action on how to proceed. > > >>>>>>>> > > >>>>>>>> The advantage of the former is the clarity of semantics -- the > > >>>> record > > >>>>> is > > >>>>>>>> invalid (property of the record, not a function of server state > > >> or > > >>>>>> server > > >>>>>>>> configuration) and we can clearly know that it is the record > > >> that > > >>> is > > >>>>> bad > > >>>>>>>> and can never succeed. > > >>>>>>>> > > >>>>>>>> The KIP-proposed way actually has a very tricky point: it > > >> actually > > >>>>>> handles > > >>>>>>>> a subset of record-too-large exceptions. The broker can return > > >>>>>>>> record-too-large and reject the whole batch (but we don't want > > >> to > > >>>>> ignore > > >>>>>>>> those because then we can skip random records that just happened > > >>> to > > >>>> be > > >>>>>> in > > >>>>>>>> the same batch), in some sense we use the same error for 2 > > >>> different > > >>>>>>>> conditions and understanding that requires pretty deep > > >>> understanding > > >>>>> of > > >>>>>>>> Kafka internals. > > >>>>>>>> > > >>>>>>>> -Artem > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On Wed, May 8, 2024 at 9:47 AM Justine Olshan > > >>>>>> <jols...@confluent.io.invalid > > >>>>>>>>> > > >>>>>>>> wrote: > > >>>>>>>> > > >>>>>>>>> My concern with respect to it being fragile: the code that > > >>> ensures > > >>>>> the > > >>>>>>>>> error type is internal to the producer. Someone may see it and > > >>>> say, I > > >>>>>>>> want > > >>>>>>>>> to add such and such error. This looks like internal code, so I > > >>>> don't > > >>>>>>>> need > > >>>>>>>>> a KIP, and then they can change it to whatever they want > > >> thinking > > >>>> it > > >>>>> is > > >>>>>>>>> within the typical kafka improvement protocol. > > >>>>>>>>> > > >>>>>>>>> Relying on an internal change to enforce an external API is > > >>> fragile > > >>>>> in > > >>>>>> my > > >>>>>>>>> opinion. That's why I sort of agreed with Artem with enforcing > > >>> the > > >>>>>> error > > >>>>>>>> in > > >>>>>>>>> the method signature -- part of the public API. > > >>>>>>>>> > > >>>>>>>>> Chris's comments on requiring more information to handler again > > >>>> makes > > >>>>>> me > > >>>>>>>>> wonder if we are solving a problem of lack of information at > > >> the > > >>>>>>>>> application level with a more powerful solution than we need. > > >>> (Ie, > > >>>> if > > >>>>>> we > > >>>>>>>>> had more information, could the application close and restart > > >> the > > >>>>>>>>> transaction rather than having to drop records) But I am happy > > >> to > > >>>>>>>>> compromise with a handler that we can agree is sufficiently > > >>>>> controlled > > >>>>>>>> and > > >>>>>>>>> documented. > > >>>>>>>>> > > >>>>>>>>> Justine > > >>>>>>>>> > > >>>>>>>>> On Wed, May 8, 2024 at 7:20 AM Chris Egerton > > >>>> <chr...@aiven.io.invalid > > >>>>>> > > >>>>>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>>> Hi Alieh, > > >>>>>>>>>> > > >>>>>>>>>> Continuing prior discussions: > > >>>>>>>>>> > > >>>>>>>>>> 1) Regarding the "flexibility" discussion, my overarching > > >> point > > >>> is > > >>>>>>>> that I > > >>>>>>>>>> don't see the point in allowing for this kind of pluggable > > >> logic > > >>>>>>>> without > > >>>>>>>>>> also covering more scenarios. Take example 2 in the KIP: if > > >>> we're > > >>>>>> going > > >>>>>>>>> to > > >>>>>>>>>> implement retries only on "important" topics when a topic > > >>>> partition > > >>>>>>>> isn't > > >>>>>>>>>> found, why wouldn't we also want to be able to do this for > > >> other > > >>>>>>>> errors? > > >>>>>>>>>> Again, taking authorization errors as an example, why wouldn't > > >>> we > > >>>>> want > > >>>>>>>> to > > >>>>>>>>>> be able to fail when we can't write to "important" topics > > >>> because > > >>>>> the > > >>>>>>>>>> producer principal lacks sufficient ACLs, and drop the record > > >> if > > >>>> the > > >>>>>>>>> topic > > >>>>>>>>>> isn't "important"? In a security-conscious environment with > > >>>>>>>>>> runtime-dependent topic routing (which is a common feature of > > >>> many > > >>>>>>>> source > > >>>>>>>>>> connectors, such as the Debezium connectors), this seems > > >> fairly > > >>>>>> likely. > > >>>>>>>>>> > > >>>>>>>>>> 2) As far as changing the shape of the API goes, I like > > >> Artem's > > >>>> idea > > >>>>>> of > > >>>>>>>>>> splitting out the interface based on specific exceptions. This > > >>> may > > >>>>> be > > >>>>>> a > > >>>>>>>>>> little laborious to expand in the future, but if we really > > >> want > > >>> to > > >>>>>>>>>> limit the exceptions that we cover with the handler and move > > >>>> slowly > > >>>>>> and > > >>>>>>>>>> cautiously, then IMO it'd be reasonable to reflect that in the > > >>>>>>>>> interface. I > > >>>>>>>>>> also acknowledge that there's no way to completely prevent > > >>> people > > >>>>> from > > >>>>>>>>>> shooting themselves in the foot by implementing the API > > >>>> incorrectly, > > >>>>>>>> but > > >>>>>>>>> I > > >>>>>>>>>> think it's worth it to do what we can--including leveraging > > >> the > > >>>> Java > > >>>>>>>>>> language's type system--to help them, so IMO there's value to > > >>>>>>>> eliminating > > >>>>>>>>>> the implicit behavior of failing when a policy returns RETRY > > >>> for a > > >>>>>>>>>> non-retriable error. This can take a variety of shapes and I'm > > >>> not > > >>>>>>>> going > > >>>>>>>>> to > > >>>>>>>>>> insist on anything specific, but I do want to again raise my > > >>>>> concerns > > >>>>>>>>> with > > >>>>>>>>>> the current proposal and request that we find something a > > >> little > > >>>>>>>> better. > > >>>>>>>>>> > > >>>>>>>>>> 3) Concerning the default implementation--actually, I meant > > >>> what I > > >>>>>>>> wrote > > >>>>>>>>> :) > > >>>>>>>>>> I don't want a "second" default, I want an implementation of > > >>> this > > >>>>>>>>> interface > > >>>>>>>>>> to be used as the default if no others are specified. The > > >>> behavior > > >>>>> of > > >>>>>>>>> this > > >>>>>>>>>> default implementation would be identical to existing behavior > > >>> (so > > >>>>>>>> there > > >>>>>>>>>> would be no backwards compatibility concerns like the ones > > >>> raised > > >>>> by > > >>>>>>>>>> Matthias), but it would be possible to configure this default > > >>>>> handler > > >>>>>>>>> class > > >>>>>>>>>> to behave differently for a basic set of scenarios. This would > > >>>>> mirror > > >>>>>>>>> (pun > > >>>>>>>>>> intended) the approach we've taken with Mirror Maker 2 and its > > >>>>>>>>>> ReplicationPolicy interface [1]. There is a default > > >>> implementation > > >>>>>>>>>> available [2] that recognizes a handful of basic configuration > > >>>>>>>> properties > > >>>>>>>>>> [3] for simple tweaks, but if users want, they can also > > >>> implement > > >>>>>> their > > >>>>>>>>> own > > >>>>>>>>>> replication policy for more fine-grained logic if those > > >>> properties > > >>>>>>>> aren't > > >>>>>>>>>> flexible enough. > > >>>>>>>>>> > > >>>>>>>>>> More concretely, I'm imagining something like this for the > > >>>> producer > > >>>>>>>>>> exception handler: > > >>>>>>>>>> > > >>>>>>>>>> - Default implementation class > > >>>>>>>>>> of > > >>>> org.apache.kafka.clients.producer.DefaultProducerExceptionHandler > > >>>>>>>>>> - This class would recognize two properties: > > >>>>>>>>>> - drop.invalid.large.records: Boolean property, defaults to > > >>>> false. > > >>>>> If > > >>>>>>>>>> "false", then causes the handler to return FAIL whenever > > >>>>>>>>>> a RecordTooLargeException is encountered; if "true", then > > >> causes > > >>>>>>>>>> SWALLOW/SKIP/DROP to be returned instead > > >>>>>>>>>> - unknown.topic.partition.retry.timeout.ms: Integer > > >> property, > > >>>>>>>> defaults > > >>>>>>>>>> to > > >>>>>>>>>> INT_MAX. Whenever an UnknownTopicOrPartitionException is > > >>>>> encountered, > > >>>>>>>>>> causes the handler to return FAIL if that record has been > > >>> pending > > >>>>> for > > >>>>>>>>> more > > >>>>>>>>>> than the retry timeout; otherwise, causes RETRY to be returned > > >>>>>>>>>> > > >>>>>>>>>> I think this is worth addressing now instead of later because > > >> it > > >>>>>> forces > > >>>>>>>>> us > > >>>>>>>>>> to evaluate the usefulness of this interface and it addresses > > >> a > > >>>>>>>>>> long-standing issue not just with Kafka Connect, but with the > > >>> Java > > >>>>>>>>> producer > > >>>>>>>>>> in general. For reference, here are a few tickets I collected > > >>>> after > > >>>>>>>>> briefly > > >>>>>>>>>> skimming our Jira showing that this is a real pain point for > > >>>> users: > > >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10340, > > >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-12990, > > >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-13634. Although > > >>> this > > >>>> is > > >>>>>>>>>> frequently reported with Kafka Connect, it applies to anyone > > >> who > > >>>>>>>>> configures > > >>>>>>>>>> a producer to use a high retry timeout. I am aware of the > > >>>>>> max.block.ms > > >>>>>>>>>> property, but it's painful and IMO poor behavior to require > > >>> users > > >>>> to > > >>>>>>>>> reduce > > >>>>>>>>>> the value of this property just to handle the single scenario > > >>> when > > >>>>>>>> trying > > >>>>>>>>>> to write to topics that don't exist, since it would also limit > > >>> the > > >>>>>>>> retry > > >>>>>>>>>> timeout for other operations that are legitimately retriable. > > >>>>>>>>>> > > >>>>>>>>>> Raising new points: > > >>>>>>>>>> > > >>>>>>>>>> 5) I don't see the interplay between this handler and existing > > >>>>>>>>>> retry-related properties mentioned anywhere in the KIP. I'm > > >>>> assuming > > >>>>>>>> that > > >>>>>>>>>> properties like "retries", "max.block.ms", and " > > >>>> delivery.timeout.ms > > >>>>> " > > >>>>>>>>> would > > >>>>>>>>>> take precedence over the handler and once they are exhausted, > > >>> the > > >>>>>>>>>> record/batch will fail no matter what? If so, it's probably > > >>> worth > > >>>>>>>> briefly > > >>>>>>>>>> mentioning this (no more than a sentence or two) in the KIP, > > >> and > > >>>> if > > >>>>>>>> not, > > >>>>>>>>>> I'm curious what you have in mind. > > >>>>>>>>>> > > >>>>>>>>>> 6) I also wonder if the API provides enough information in its > > >>>>> current > > >>>>>>>>>> form. Would it be possible to provide handlers with some way > > >> of > > >>>>>>>> tracking > > >>>>>>>>>> how long a record has been pending for (i.e., how long it's > > >> been > > >>>>> since > > >>>>>>>>> the > > >>>>>>>>>> record was provided as an argument to Producer::send)? One way > > >>> to > > >>>> do > > >>>>>>>> this > > >>>>>>>>>> could be to add a method like `onNewRecord(ProducerRecord)` > > >> and > > >>>>>>>>>> allow/require the handler to do its own bookkeeping, probably > > >>>> with a > > >>>>>>>>>> matching `onRecordSuccess(ProducerRecord)` method so that the > > >>>>> handler > > >>>>>>>>>> doesn't eat up an ever-increasing amount of memory trying to > > >>> track > > >>>>>>>>> records. > > >>>>>>>>>> An alternative could be to include information about the > > >> initial > > >>>>> time > > >>>>>>>> the > > >>>>>>>>>> record was received by the producer and the number of retries > > >>> that > > >>>>>> have > > >>>>>>>>>> been performed for it as parameters in the handle method(s), > > >> but > > >>>> I'm > > >>>>>>>> not > > >>>>>>>>>> sure how easy this would be to implement and it might clutter > > >>>> things > > >>>>>>>> up a > > >>>>>>>>>> bit too much. > > >>>>>>>>>> > > >>>>>>>>>> 7) A small request--can we add Closeable (or, if you prefer, > > >>>>>>>>> AutoCloseable) > > >>>>>>>>>> as a superinterface for the handler interface? > > >>>>>>>>>> > > >>>>>>>>>> [1] - > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/ReplicationPolicy.html > > >>>>>>>>>> [2] - > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html > > >>>>>>>>>> [3] - > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#SEPARATOR_CONFIG > > >>>>>>>>>> , > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG > > >>>>>>>>>> > > >>>>>>>>>> Cheers, > > >>>>>>>>>> > > >>>>>>>>>> Chris > > >>>>>>>>>> > > >>>>>>>>>> On Wed, May 8, 2024 at 12:37 AM Matthias J. Sax < > > >>> mj...@apache.org > > >>>>> > > >>>>>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>>> Very interesting discussion. > > >>>>>>>>>>> > > >>>>>>>>>>> Seems a central point is the question about "how generic" we > > >>>>> approach > > >>>>>>>>>>> this, and some people think we need to be conservative and > > >>> others > > >>>>>>>> think > > >>>>>>>>>>> we should try to be as generic as possible. > > >>>>>>>>>>> > > >>>>>>>>>>> Personally, I think following a limited scope for this KIP > > >> (by > > >>>>>>>>>>> explicitly saying we only cover RecordTooLarge and > > >>>>>>>>>>> UnknownTopicOrPartition) might be better. We have concrete > > >>>> tickets > > >>>>>>>> that > > >>>>>>>>>>> we address, while for other exception (like authorization) we > > >>>> don't > > >>>>>>>>> know > > >>>>>>>>>>> if people want to handle it to begin with. Boiling the ocean > > >>>> might > > >>>>>>>> not > > >>>>>>>>>>> get us too far, and being somewhat pragmatic might help to > > >> move > > >>>>> this > > >>>>>>>>> KIP > > >>>>>>>>>>> forward. -- I also agree with Justin and Artem, that we want > > >> to > > >>>> be > > >>>>>>>>>>> careful anyway to not allow users to break stuff too easily. > > >>>>>>>>>>> > > >>>>>>>>>>> As the same time, I agree that we should setup this change > > >> in a > > >>>>>>>> forward > > >>>>>>>>>>> looking way, and thus having a single generic handler allows > > >> us > > >>>> to > > >>>>>>>>> later > > >>>>>>>>>>> extend the handler more easily. This should also simplify > > >>> follow > > >>>> up > > >>>>>>>> KIP > > >>>>>>>>>>> that might add new error cases (I actually mentioned one more > > >>> to > > >>>>>>>> Alieh > > >>>>>>>>>>> already, but we both agreed that it might be best to exclude > > >> it > > >>>>> from > > >>>>>>>>> the > > >>>>>>>>>>> KIP right now, to make the 3.8 deadline. Doing a follow up > > >> KIP > > >>> is > > >>>>> not > > >>>>>>>>>>> the end of the world.) > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> @Chris: > > >>>>>>>>>>> > > >>>>>>>>>>> (2) This sounds fair to me, but not sure how "bad" it > > >> actually > > >>>>> would > > >>>>>>>>> be? > > >>>>>>>>>>> If the contract is clearly defined, it seems to be fine what > > >>> the > > >>>>> KIP > > >>>>>>>>>>> proposes, and given that such a handler is an expert API, and > > >>> we > > >>>>> can > > >>>>>>>>>>> provide "best practices" (cf my other comment below in > > >> [AL1]), > > >>>>> being > > >>>>>>>> a > > >>>>>>>>>>> little bit pragmatic sound fine to me. > > >>>>>>>>>>> > > >>>>>>>>>>> Not sure if I understand Justin's argument on this question? > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> (3) About having a default impl or not. I am fine with adding > > >>>> one, > > >>>>>>>> even > > >>>>>>>>>>> if I am not sure why Connect could not just add its own one > > >> and > > >>>>> plug > > >>>>>>>> it > > >>>>>>>>>>> in (and we would add corresponding configs for Connect, but > > >> not > > >>>> for > > >>>>>>>> the > > >>>>>>>>>>> producer itself)? For this case, we could also do this as a > > >>>> follow > > >>>>> up > > >>>>>>>>>>> KIP, but happy to include it in this KIP to provide value to > > >>>>> Connect > > >>>>>>>>>>> right away (even if the value might not come right away if we > > >>>> miss > > >>>>>>>> the > > >>>>>>>>>>> 3.8 deadline due to expanded KIP scope...) -- For KS, we > > >> would > > >>>> for > > >>>>>>>>> sure > > >>>>>>>>>>> plugin our own impl, and lock down the config such that users > > >>>>> cannot > > >>>>>>>>> set > > >>>>>>>>>>> their own handler on the internal producer to begin with. > > >> Might > > >>>> be > > >>>>>>>> good > > >>>>>>>>>>> to elaborate why the producer should have a default? We might > > >>>>>>>> actually > > >>>>>>>>>>> want to add this to the KIP right away? > > >>>>>>>>>>> > > >>>>>>>>>>> The key for a default impl would be, to not change the > > >> current > > >>>>>>>>> behavior, > > >>>>>>>>>>> and having no default seems to achieve this. For the two > > >> cases > > >>>> you > > >>>>>>>>>>> mentioned, it's unclear to me what default value on "upper > > >>> bound > > >>>> on > > >>>>>>>>>>> retires" for UnkownTopicOrPartitionException we should set? > > >>> Seems > > >>>>> it > > >>>>>>>>>>> would need to be the same as `delivery.timeout.ms`? However, > > >>> if > > >>>>>>>> users > > >>>>>>>>>>> have `delivery.timeout.ms` actually overwritten we would > > >> need > > >>> to > > >>>>> set > > >>>>>>>>>>> this config somewhat "dynamic"? Is this feasible? If we > > >>>> hard-code 2 > > >>>>>>>>>>> minutes, it might not be backward compatible. I have the > > >>>> impression > > >>>>>>>> we > > >>>>>>>>>>> might introduce some undesired coupling? -- For the "record > > >> too > > >>>>>>>> large" > > >>>>>>>>>>> case, the config seems to be boolean and setting it to > > >> `false` > > >>> by > > >>>>>>>>>>> default seems to provide backward compatibility. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> @Artem: > > >>>>>>>>>>> > > >>>>>>>>>>> [AL1] While I see the point, I would think having a different > > >>>>>>>> callback > > >>>>>>>>>>> for every exception might not really be elegant? In the end, > > >>> the > > >>>>>>>>> handler > > >>>>>>>>>>> is an very advanced feature anyway, and if it's implemented > > >> in > > >>> a > > >>>>> bad > > >>>>>>>>>>> way, well, it's a user error -- we cannot protect users from > > >>>>>>>>> everything. > > >>>>>>>>>>> To me, a handler like this, is to some extend "business > > >> logic" > > >>>> and > > >>>>>>>> if a > > >>>>>>>>>>> user gets business logic wrong, it's hard to protect them. -- > > >>> We > > >>>>>>>> would > > >>>>>>>>>>> of course provide best practice guidance in the JaveDocs, and > > >>>>> explain > > >>>>>>>>>>> that a handler should have explicit `if` statements for stuff > > >>> it > > >>>>> want > > >>>>>>>>> to > > >>>>>>>>>>> handle, and only a single default which return FAIL. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> [AL2] Yes, but for KS we would retry at the application > > >> layer. > > >>>> Ie, > > >>>>>>>> the > > >>>>>>>>>>> TX is not completed, we clean up and setup out task from > > >>> scratch, > > >>>>> to > > >>>>>>>>>>> ensure the pending transaction is completed before we resume. > > >>> If > > >>>>> the > > >>>>>>>> TX > > >>>>>>>>>>> was indeed aborted, we would retry from older offset and thus > > >>>> just > > >>>>>>>> hit > > >>>>>>>>>>> the same error again and the loop begins again. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> [AL2 cont.] Similar to AL1, I see such a handler to some > > >> extend > > >>>> as > > >>>>>>>>>>> business logic. If a user puts a bad filter condition in > > >> their > > >>> KS > > >>>>>>>> app, > > >>>>>>>>>>> and drops messages, it nothing we can do about it, and this > > >>>> handler > > >>>>>>>>>>> IMHO, has a similar purpose. This is also the line of > > >> thinking > > >>> I > > >>>>>>>> apply > > >>>>>>>>>>> to EOS, to address Justin's concern about "should we allow to > > >>>> drop > > >>>>>>>> for > > >>>>>>>>>>> EOS", and my answer is "yes", because it's more business > > >> logic > > >>>> than > > >>>>>>>>>>> actual error handling IMHO. And by default, we fail... So > > >> users > > >>>>>>>> opt-in > > >>>>>>>>>>> to add business logic to drop records. It's an application > > >>> level > > >>>>>>>>>>> decision how to write the code. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> [AL3] Maybe I misunderstand what you are saying, but to me, > > >>>>> checking > > >>>>>>>>> the > > >>>>>>>>>>> size of the record upfront is exactly what the KIP proposes? > > >>> No? > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> @Justin: > > >>>>>>>>>>> > > >>>>>>>>>>>> I saw the sample > > >>>>>>>>>>>> code -- is it just an if statement checking for the error > > >>> before > > >>>>>>>> the > > >>>>>>>>>>>> handler is invoked? That seems a bit fragile. > > >>>>>>>>>>> > > >>>>>>>>>>> What do you mean by fragile? Not sure if I see your point. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> -Matthias > > >>>>>>>>>>> > > >>>>>>>>>>> On 5/7/24 5:33 PM, Artem Livshits wrote: > > >>>>>>>>>>>> Hi Alieh, > > >>>>>>>>>>>> > > >>>>>>>>>>>> Thanks for the KIP. The motivation talks about very > > >> specific > > >>>>>>>> cases, > > >>>>>>>>>> but > > >>>>>>>>>>>> the interface is generic. > > >>>>>>>>>>>> > > >>>>>>>>>>>> [AL1] > > >>>>>>>>>>>> If the interface evolves in the future I think we could have > > >>> the > > >>>>>>>>>>> following > > >>>>>>>>>>>> confusion: > > >>>>>>>>>>>> > > >>>>>>>>>>>> 1. A user implemented SWALLOW action for both > > >>>>>>>> RecordTooLargeException > > >>>>>>>>>> and > > >>>>>>>>>>>> UnknownTopicOrPartitionException. For simpicity they just > > >>>> return > > >>>>>>>>>> SWALLOW > > >>>>>>>>>>>> from the function, because it elegantly handles all known > > >>> cases. > > >>>>>>>>>>>> 2. The interface has evolved to support a new exception. > > >>>>>>>>>>>> 3. The user has upgraded their Kafka client. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Now a new kind of error gets dropped on the floor without > > >>> user's > > >>>>>>>>>>> intention > > >>>>>>>>>>>> and it would be super hard to detect and debug. > > >>>>>>>>>>>> > > >>>>>>>>>>>> To avoid the confusion, I think we should use handlers for > > >>>>> specific > > >>>>>>>>>>>> exceptions. Then if a new exception is added it won't get > > >>>>> silently > > >>>>>>>>>>> swalled > > >>>>>>>>>>>> because the user would need to add new functionality to > > >> handle > > >>>> it. > > >>>>>>>>>>>> > > >>>>>>>>>>>> I also have some higher level comments: > > >>>>>>>>>>>> > > >>>>>>>>>>>> [AL2] > > >>>>>>>>>>>>> it throws a TimeoutException, and the user can only blindly > > >>>>> retry, > > >>>>>>>>>> which > > >>>>>>>>>>>> may result in an infinite retry loop > > >>>>>>>>>>>> > > >>>>>>>>>>>> If the TimeoutException happens during transactional > > >>> processing > > >>>>>>>>>> (exactly > > >>>>>>>>>>>> once is the desired sematnics), then the client should not > > >>> retry > > >>>>>>>> when > > >>>>>>>>>> it > > >>>>>>>>>>>> gets TimeoutException because without knowing the reason for > > >>>>>>>>>>>> TimeoutExceptions, the client cannot know whether the > > >> message > > >>>> got > > >>>>>>>>>>> actually > > >>>>>>>>>>>> produced or not and retrying the message may result in > > >>>>> duplicatees. > > >>>>>>>>>>>> > > >>>>>>>>>>>>> The thrown TimeoutException "cuts" the connection to the > > >>>>>>>> underlying > > >>>>>>>>>> root > > >>>>>>>>>>>> cause of missing metadata > > >>>>>>>>>>>> > > >>>>>>>>>>>> Maybe we should fix the error handling and return the proper > > >>>>>>>>> underlying > > >>>>>>>>>>>> message? Then the application can properly handle the > > >> message > > >>>>>>>> based > > >>>>>>>>> on > > >>>>>>>>>>>> preferences. > > >>>>>>>>>>>> > > >>>>>>>>>>>> From the product perspective, it's not clear how safe it is > > >> to > > >>>>>>>>> blindly > > >>>>>>>>>>>> ignore UnknownTopicOrPartitionException. This could lead to > > >>>>>>>>> situations > > >>>>>>>>>>>> when a simple typo could lead to massive data loss (part of > > >>> the > > >>>>>>>> data > > >>>>>>>>>>> would > > >>>>>>>>>>>> effectively be produced to a "black hole" and the user may > > >> not > > >>>>>>>> notice > > >>>>>>>>>> it > > >>>>>>>>>>>> for a while). > > >>>>>>>>>>>> > > >>>>>>>>>>>> In which situations would you recommend the user to "black > > >>> hole" > > >>>>>>>>>> messages > > >>>>>>>>>>>> in case of misconfiguration? > > >>>>>>>>>>>> > > >>>>>>>>>>>> [AL3] > > >>>>>>>>>>>> > > >>>>>>>>>>>>> If the custom handler decides on SWALLOW for > > >>>>>>>>> RecordTooLargeException, > > >>>>>>>>>>>> > > >>>>>>>>>>>> Is it my understanding that this KIP proposes that > > >>> functionality > > >>>>>>>> that > > >>>>>>>>>>> would > > >>>>>>>>>>>> only be able to SWALLOW RecordTooLargeException that happen > > >>>>> because > > >>>>>>>>> the > > >>>>>>>>>>>> producer cannot produce the record (if the broker rejects > > >> the > > >>>>>>>> batch, > > >>>>>>>>>> the > > >>>>>>>>>>>> error won't get to the handler, because we cannot know which > > >>>> other > > >>>>>>>>>>> records > > >>>>>>>>>>>> get ignored). In this case, why not just check the locally > > >>>>>>>>> configured > > >>>>>>>>>>> max > > >>>>>>>>>>>> record size upfront and not produce the recrord in the first > > >>>>> place? > > >>>>>>>>>>> Maybe > > >>>>>>>>>>>> we can expose a validation function from the producer that > > >>> could > > >>>>>>>>>> validate > > >>>>>>>>>>>> the records locally, so we don't need to produce the record > > >> in > > >>>>>>>> order > > >>>>>>>>> to > > >>>>>>>>>>>> know that it's invalid. > > >>>>>>>>>>>> > > >>>>>>>>>>>> -Artem > > >>>>>>>>>>>> > > >>>>>>>>>>>> On Tue, May 7, 2024 at 2:07 PM Justine Olshan > > >>>>>>>>>>> <jols...@confluent.io.invalid> > > >>>>>>>>>>>> wrote: > > >>>>>>>>>>>> > > >>>>>>>>>>>>> Alieh and Chris, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Thanks for clarifying 1) but I saw the motivation. I guess > > >> I > > >>>> just > > >>>>>>>>>> didn't > > >>>>>>>>>>>>> understand how that would be ensured on the producer side. > > >> I > > >>>> saw > > >>>>>>>> the > > >>>>>>>>>>> sample > > >>>>>>>>>>>>> code -- is it just an if statement checking for the error > > >>>> before > > >>>>>>>> the > > >>>>>>>>>>>>> handler is invoked? That seems a bit fragile. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Can you clarify what you mean by `since the code does not > > >>> reach > > >>>>>>>> the > > >>>>>>>>> KS > > >>>>>>>>>>>>> interface and breaks somewhere in producer.` If we surfaced > > >>>> this > > >>>>>>>>> error > > >>>>>>>>>>> to > > >>>>>>>>>>>>> the application in a better way would that also be a > > >> solution > > >>>> to > > >>>>>>>> the > > >>>>>>>>>>> issue? > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Justine > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On Tue, May 7, 2024 at 1:55 PM Alieh Saeedi > > >>>>>>>>>>> <asae...@confluent.io.invalid> > > >>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> Hi, > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Thank you, Chris and Justine, for the feedback. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> @Chris > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 1) Flexibility: it has two meanings. The first meaning is > > >>> the > > >>>>> one > > >>>>>>>>> you > > >>>>>>>>>>>>>> mentioned. We are going to cover more exceptions in the > > >>>> future, > > >>>>>>>> but > > >>>>>>>>>> as > > >>>>>>>>>>>>>> Justine mentioned, we must be very conservative about > > >> adding > > >>>>> more > > >>>>>>>>>>>>>> exceptions. Additionally, flexibility mainly means that > > >> the > > >>>> user > > >>>>>>>> is > > >>>>>>>>>>> able > > >>>>>>>>>>>>> to > > >>>>>>>>>>>>>> develop their own code. As mentioned in the motivation > > >>> section > > >>>>>>>> and > > >>>>>>>>>> the > > >>>>>>>>>>>>>> examples, sometimes the user decides on dropping a record > > >>>> based > > >>>>>>>> on > > >>>>>>>>>> the > > >>>>>>>>>>>>>> topic, for example. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 2) Defining two separate methods for retriable and > > >>>> non-retriable > > >>>>>>>>>>>>>> exceptions: although the idea is brilliant, the user may > > >>> still > > >>>>>>>>> make a > > >>>>>>>>>>>>>> mistake by implementing the wrong method and see a > > >>>> non-expecting > > >>>>>>>>>>>>> behaviour. > > >>>>>>>>>>>>>> For example, he may implement handleRetriable() for > > >>>>>>>>>>>>> RecordTooLargeException > > >>>>>>>>>>>>>> and define SWALLOW for the exception, but in practice, he > > >>> sees > > >>>>> no > > >>>>>>>>>>> change > > >>>>>>>>>>>>> in > > >>>>>>>>>>>>>> default behaviour since he implemented the wrong method. I > > >>>> think > > >>>>>>>> we > > >>>>>>>>>> can > > >>>>>>>>>>>>>> never reduce the user’s mistakes to 0. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 3) Default implementation for Handler: the default > > >> behaviour > > >>>> is > > >>>>>>>>>> already > > >>>>>>>>>>>>>> preserved with NO need of implementing any handler or > > >>> setting > > >>>>> the > > >>>>>>>>>>>>>> corresponding config parameter `custom.exception.handler`. > > >>>> What > > >>>>>>>> you > > >>>>>>>>>>> mean > > >>>>>>>>>>>>> is > > >>>>>>>>>>>>>> actually having a second default, which requires having > > >> both > > >>>>>>>>>> interface > > >>>>>>>>>>>>> and > > >>>>>>>>>>>>>> config parameters. About UnknownTopicOrPartitionException: > > >>> the > > >>>>>>>>>> producer > > >>>>>>>>>>>>>> already offers the config parameter `max.block.ms` which > > >>>>>>>>> determines > > >>>>>>>>>>> the > > >>>>>>>>>>>>>> duration of retrying. The main purpose of the user who > > >> needs > > >>>> the > > >>>>>>>>>>> handler > > >>>>>>>>>>>>> is > > >>>>>>>>>>>>>> to get the root cause of TimeoutException and handle it in > > >>> the > > >>>>>>>> way > > >>>>>>>>> he > > >>>>>>>>>>>>>> intends. The KIP explains the necessity of it for KS > > >> users. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 4) Naming issue: By SWALLOW, we meant actually swallow the > > >>>>> error, > > >>>>>>>>>> while > > >>>>>>>>>>>>>> SKIP means skip the record, I think. If it makes sense for > > >>>> more > > >>>>>>>>> ppl, > > >>>>>>>>>> I > > >>>>>>>>>>>>> can > > >>>>>>>>>>>>>> change it to SKIP > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> @Justine > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 1) was addressed by Chris. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 2 and 3) The problem is exactly what you mentioned. > > >>> Currently, > > >>>>>>>>> there > > >>>>>>>>>> is > > >>>>>>>>>>>>> no > > >>>>>>>>>>>>>> way to handle these issues application-side. Even KS users > > >>> who > > >>>>>>>>>>> implement > > >>>>>>>>>>>>> KS > > >>>>>>>>>>>>>> ProductionExceptionHandler are not able to handle the > > >>>> exceptions > > >>>>>>>> as > > >>>>>>>>>>> they > > >>>>>>>>>>>>>> intend since the code does not reach the KS interface and > > >>>> breaks > > >>>>>>>>>>>>> somewhere > > >>>>>>>>>>>>>> in producer. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>>> Alieh > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> On Tue, May 7, 2024 at 8:43 PM Chris Egerton < > > >>>>>>>>>> fearthecel...@gmail.com> > > >>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Hi Justine, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> The method signatures for the interface are indeed > > >>>> open-ended, > > >>>>>>>> but > > >>>>>>>>>> the > > >>>>>>>>>>>>>> KIP > > >>>>>>>>>>>>>>> states that its uses will be limited. See the motivation > > >>>>>>>> section: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> We believe that the user should be able to develop > > >> custom > > >>>>>>>>> exception > > >>>>>>>>>>>>>>> handlers for managing producer exceptions. On the other > > >>> hand, > > >>>>>>>> this > > >>>>>>>>>>> will > > >>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>> an expert-level API, and using that may result in strange > > >>>>>>>>> behaviour > > >>>>>>>>>> in > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>> system, making it hard to find the root cause. Therefore, > > >>> the > > >>>>>>>>> custom > > >>>>>>>>>>>>>>> handler is currently limited to handling > > >>>>> RecordTooLargeException > > >>>>>>>>> and > > >>>>>>>>>>>>>>> UnknownTopicOrPartitionException. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Chris > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On Tue, May 7, 2024, 14:37 Justine Olshan > > >>>>>>>>>>> <jols...@confluent.io.invalid > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Hi Alieh, > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> I was out for KSB and then was also sick. :( > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> To your point 1) Chris, I don't think it is limited to > > >> two > > >>>>>>>>> specific > > >>>>>>>>>>>>>>>> scenarios, since the interface accepts a generic > > >>> Exception e > > >>>>>>>> and > > >>>>>>>>>> can > > >>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>> implemented to check if that e is an instanceof any > > >>>> exception. > > >>>>>>>> I > > >>>>>>>>>>>>> didn't > > >>>>>>>>>>>>>>> see > > >>>>>>>>>>>>>>>> anywhere that specific errors are enforced. I'm a bit > > >>>>> concerned > > >>>>>>>>>> about > > >>>>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>> actually. I'm concerned about the opened-endedness and > > >> the > > >>>>>>>>> contract > > >>>>>>>>>>>>> we > > >>>>>>>>>>>>>>> have > > >>>>>>>>>>>>>>>> with transactions. We are allowing the client to make > > >>>>> decisions > > >>>>>>>>>> that > > >>>>>>>>>>>>>> are > > >>>>>>>>>>>>>>>> somewhat invisible to the server. As an aside, can we > > >>> build > > >>>> in > > >>>>>>>>> log > > >>>>>>>>>>>>>>> messages > > >>>>>>>>>>>>>>>> when the handler decides to skip etc a message. I'm > > >> really > > >>>>>>>>>> concerned > > >>>>>>>>>>>>>>> about > > >>>>>>>>>>>>>>>> messages being silently dropped. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> I do think Chris's point 2) about retriable vs non > > >>> retriable > > >>>>>>>>> errors > > >>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>> fair. I'm a bit concerned about skipping a unknown topic > > >>> or > > >>>>>>>>>> partition > > >>>>>>>>>>>>>>>> exception too early, as there are cases where it can be > > >>>>>>>>> transient. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> I'm still a little bit wary of allowing dropping records > > >>> as > > >>>>>>>> part > > >>>>>>>>> of > > >>>>>>>>>>>>> EOS > > >>>>>>>>>>>>>>>> generally as in many cases, these errors signify an > > >> issue > > >>>> with > > >>>>>>>>> the > > >>>>>>>>>>>>>>> original > > >>>>>>>>>>>>>>>> data. I understand that streams and connect/mirror maker > > >>> may > > >>>>>>>> have > > >>>>>>>>>>>>>> reasons > > >>>>>>>>>>>>>>>> they want to progress past these messages, but wondering > > >>> if > > >>>>>>>> there > > >>>>>>>>>> is > > >>>>>>>>>>>>> a > > >>>>>>>>>>>>>>> way > > >>>>>>>>>>>>>>>> that can be done application-side. I'm willing to accept > > >>>> this > > >>>>>>>>> sort > > >>>>>>>>>> of > > >>>>>>>>>>>>>>>> proposal if we can make it clear that this sort of thing > > >>> is > > >>>>>>>>>> happening > > >>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>> we limit the blast radius for what we can do. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Justine > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> On Tue, May 7, 2024 at 9:55 AM Chris Egerton > > >>>>>>>>>> <chr...@aiven.io.invalid > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Hi Alieh, > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Sorry for the delay, I've been out sick. I still have > > >>> some > > >>>>>>>>>> thoughts > > >>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>> I'd like to see addressed before voting. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> 1) If flexibility is the motivation for a pluggable > > >>>>> interface, > > >>>>>>>>> why > > >>>>>>>>>>>>>> are > > >>>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>> only limiting the uses for this interface to two very > > >>>>> specific > > >>>>>>>>>>>>>>> scenarios? > > >>>>>>>>>>>>>>>>> Why not also allow, e.g., authorization errors to be > > >>>> handled > > >>>>>>>> as > > >>>>>>>>>>>>> well > > >>>>>>>>>>>>>>>>> (allowing users to drop records destined for some > > >>>> off-limits > > >>>>>>>>>>>>> topics, > > >>>>>>>>>>>>>> or > > >>>>>>>>>>>>>>>>> retry for a limited duration in case there's a delay in > > >>> the > > >>>>>>>>>>>>>> propagation > > >>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>> ACL updates)? It'd be nice to see some analysis of > > >> other > > >>>>>>>> errors > > >>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>> could > > >>>>>>>>>>>>>>>>> be handled with this new API, both to avoid the > > >> follow-up > > >>>>> work > > >>>>>>>>> of > > >>>>>>>>>>>>>>> another > > >>>>>>>>>>>>>>>>> KIP to address them in the future, and to make sure > > >> that > > >>>>> we're > > >>>>>>>>> not > > >>>>>>>>>>>>>>>> painting > > >>>>>>>>>>>>>>>>> ourselves into a corner with the current API in a way > > >>> that > > >>>>>>>> would > > >>>>>>>>>>>>> make > > >>>>>>>>>>>>>>>>> future modifications difficult. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> 2) Something feels a bit off with how retriable vs. > > >>>>>>>>> non-retriable > > >>>>>>>>>>>>>>> errors > > >>>>>>>>>>>>>>>>> are handled with the interface. Why not introduce two > > >>>>> separate > > >>>>>>>>>>>>>> methods > > >>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>> handle each case separately? That way there's no > > >>> ambiguity > > >>>> or > > >>>>>>>>>>>>>> implicit > > >>>>>>>>>>>>>>>>> behavior when, e.g., attempting to retry on a > > >>>>>>>>>>>>>> RecordTooLargeException. > > >>>>>>>>>>>>>>>> This > > >>>>>>>>>>>>>>>>> could be something like `NonRetriableResponse > > >>>>>>>>>>>>> handle(ProducerRecord, > > >>>>>>>>>>>>>>>>> Exception)` and `RetriableResponse > > >>>>>>>>> handleRetriable(ProducerRecord, > > >>>>>>>>>>>>>>>>> Exception)`, though the exact names and shape can > > >>> obviously > > >>>>> be > > >>>>>>>>>>>>> toyed > > >>>>>>>>>>>>>>>> with a > > >>>>>>>>>>>>>>>>> bit. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> 3) Although the flexibility of a pluggable interface > > >> may > > >>>>>>>> benefit > > >>>>>>>>>>>>> some > > >>>>>>>>>>>>>>>>> users' custom producer applications and Kafka Streams > > >>>>>>>>>> applications, > > >>>>>>>>>>>>>> it > > >>>>>>>>>>>>>>>>> comes at significant deployment cost for other > > >>> low-/no-code > > >>>>>>>>>>>>>>> environments, > > >>>>>>>>>>>>>>>>> including but not limited to Kafka Connect and > > >>> MirrorMaker > > >>>> 2. > > >>>>>>>>> Can > > >>>>>>>>>>>>> we > > >>>>>>>>>>>>>>> add > > >>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>> default implementation of the exception handler that > > >>> allows > > >>>>>>>> for > > >>>>>>>>>>>>> some > > >>>>>>>>>>>>>>>> simple > > >>>>>>>>>>>>>>>>> behavior to be tweaked via configuration property? Two > > >>>> things > > >>>>>>>>> that > > >>>>>>>>>>>>>>> would > > >>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>> nice to have would be A) an upper bound on the retry > > >> time > > >>>> for > > >>>>>>>>>>>>>>>>> unknown-topic-partition exceptions and B) an option to > > >>> drop > > >>>>>>>>>> records > > >>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>> are large enough to trigger a record-too-large > > >> exception. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> 4) I'd still prefer to see "SKIP" or "DROP" instead of > > >>> the > > >>>>>>>>>> proposed > > >>>>>>>>>>>>>>>>> "SWALLOW" option, which IMO is opaque and non-obvious, > > >>>>>>>>> especially > > >>>>>>>>>>>>>> when > > >>>>>>>>>>>>>>>>> trying to guess the behavior for retriable errors. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Chris > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> On Fri, May 3, 2024 at 11:23 AM Alieh Saeedi > > >>>>>>>>>>>>>>>> <asae...@confluent.io.invalid > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Hi all, > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> A summary of the KIP and the discussions: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> The KIP introduces a handler interface for Producer in > > >>>> order > > >>>>>>>> to > > >>>>>>>>>>>>>>> handle > > >>>>>>>>>>>>>>>>> two > > >>>>>>>>>>>>>>>>>> exceptions: RecordTooLargeException and > > >>>>>>>>>>>>>>>> UnknownTopicOrPartitionException. > > >>>>>>>>>>>>>>>>>> The handler handles the exceptions per-record. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> - Do we need this handler? [Motivation and Examples > > >>>>>>>> sections] > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> RecordTooLargeException: 1) In transactions, the > > >>> producer > > >>>>>>>>>>>>> collects > > >>>>>>>>>>>>>>>>> multiple > > >>>>>>>>>>>>>>>>>> records in batches. Then a RecordTooLargeException > > >>> related > > >>>>>>>> to a > > >>>>>>>>>>>>>>> single > > >>>>>>>>>>>>>>>>>> record leads to failing the entire batch. A custom > > >>>> exception > > >>>>>>>>>>>>>> handler > > >>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>> this case may decide on dropping the record and > > >>> continuing > > >>>>>>>> the > > >>>>>>>>>>>>>>>>> processing. > > >>>>>>>>>>>>>>>>>> See Example 1, please. 2) More over, in Kafka > > >> Streams, a > > >>>>>>>> record > > >>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>> too > > >>>>>>>>>>>>>>>>>> large is a poison pill record, and there is no way to > > >>> skip > > >>>>>>>> over > > >>>>>>>>>>>>>> it. A > > >>>>>>>>>>>>>>>>>> handler would allow us to react to this error inside > > >> the > > >>>>>>>>>>>>> producer, > > >>>>>>>>>>>>>>>> i.e., > > >>>>>>>>>>>>>>>>>> local to where the error happens, and thus simplify > > >> the > > >>>>>>>> overall > > >>>>>>>>>>>>>> code > > >>>>>>>>>>>>>>>>>> significantly. Please read the Motivation section for > > >>> more > > >>>>>>>>>>>>>>> explanation. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> UnknownTopicOrPartitionException: For this case, the > > >>>>> producer > > >>>>>>>>>>>>>> handles > > >>>>>>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>>>> exception internally and only issues a WARN log about > > >>>>> missing > > >>>>>>>>>>>>>>> metadata > > >>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>> retries internally. Later, when the producer hits " > > >>>>>>>>>>>>>>> deliver.timeout.ms" > > >>>>>>>>>>>>>>>>> it > > >>>>>>>>>>>>>>>>>> throws a TimeoutException, and the user can only > > >> blindly > > >>>>>>>> retry, > > >>>>>>>>>>>>>> which > > >>>>>>>>>>>>>>>> may > > >>>>>>>>>>>>>>>>>> result in an infinite retry loop. The thrown > > >>>>> TimeoutException > > >>>>>>>>>>>>>> "cuts" > > >>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> connection to the underlying root cause of missing > > >>>> metadata > > >>>>>>>>>>>>> (which > > >>>>>>>>>>>>>>>> could > > >>>>>>>>>>>>>>>>>> indeed be a transient error but is persistent for a > > >>>>>>>>> non-existing > > >>>>>>>>>>>>>>>> topic). > > >>>>>>>>>>>>>>>>>> Thus, there is no programmatic way to break the > > >> infinite > > >>>>>>>> retry > > >>>>>>>>>>>>>> loop. > > >>>>>>>>>>>>>>>>> Kafka > > >>>>>>>>>>>>>>>>>> Streams also blindly retries for this case, and the > > >>>>>>>> application > > >>>>>>>>>>>>>> gets > > >>>>>>>>>>>>>>>>> stuck. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> - Having interface vs configuration option: > > >> [Motivation, > > >>>>>>>>>>>>> Examples, > > >>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>> Rejected Alternatives sections] > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Our solution is introducing an interface due to the > > >> full > > >>>>>>>>>>>>>> flexibility > > >>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>> it offers. Sometimes users, especially Kafka Streams > > >>> ones, > > >>>>>>>>>>>>>> determine > > >>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> handler's behaviour based on the situation. For > > >>> example, f > > >>>>>>>>>>>>>>>>>> acing UnknownTopicOrPartitionException*, *the user may > > >>>> want > > >>>>>>>> to > > >>>>>>>>>>>>>> raise > > >>>>>>>>>>>>>>> an > > >>>>>>>>>>>>>>>>>> error for some topics but retry it for other topics. > > >>>> Having > > >>>>> a > > >>>>>>>>>>>>>>>>> configuration > > >>>>>>>>>>>>>>>>>> option with a fixed set of possibilities does not > > >> serve > > >>>> the > > >>>>>>>>>>>>> user's > > >>>>>>>>>>>>>>>>>> needs. See Example 2, please. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> - Note on RecordTooLargeException: [Public Interfaces > > >>>>>>>> section] > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> If the custom handler decides on SWALLOW for > > >>>>>>>>>>>>>> RecordTooLargeException, > > >>>>>>>>>>>>>>>>> then > > >>>>>>>>>>>>>>>>>> this record will not be a part of the batch of > > >>>> transactions > > >>>>>>>> and > > >>>>>>>>>>>>>> will > > >>>>>>>>>>>>>>>> also > > >>>>>>>>>>>>>>>>>> not be sent to the broker in non-transactional mode. > > >> So > > >>> no > > >>>>>>>>>>>>> worries > > >>>>>>>>>>>>>>>> about > > >>>>>>>>>>>>>>>>>> getting a RecordTooLargeException from the broker in > > >>> this > > >>>>>>>> case, > > >>>>>>>>>>>>> as > > >>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> record will never ever be sent to the broker. SWALLOW > > >>>> means > > >>>>>>>>> drop > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>> record > > >>>>>>>>>>>>>>>>>> and continue/swallow the error. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> - What if the handle() method implements RETRY for > > >>>>>>>>>>>>>>>>> RecordTooLargeException? > > >>>>>>>>>>>>>>>>>> [Proposed Changes section] > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> We have to limit the user to only have FAIL or SWALLOW > > >>> for > > >>>>>>>>>>>>>>>>>> RecordTooLargeException. Actually, RETRY must be equal > > >>> to > > >>>>>>>> FAIL. > > >>>>>>>>>>>>>> This > > >>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>> well documented/informed in javadoc. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> - What if the handle() method of the handler throws an > > >>>>>>>>> exception? > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> The handler is expected to have correct code. If it > > >>> throws > > >>>>> an > > >>>>>>>>>>>>>>>> exception, > > >>>>>>>>>>>>>>>>>> everything fails. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> This is a PoC PR < > > >>>>> https://github.com/apache/kafka/pull/15846 > > >>>>>>>>> > > >>>>>>>>>>>>> ONLY > > >>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>> RecordTooLargeException. The code changes related to > > >>>>>>>>>>>>>>>>>> UnknownTopicOrPartitionException will be added to this > > >>> PR > > >>>>>>>>> LATER. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Looking forward to your feedback again. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Alieh > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> On Thu, Apr 25, 2024 at 11:46 PM Kirk True < > > >>>>>>>> k...@kirktrue.pro> > > >>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Hi Alieh, > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Thanks for the updates! > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Comments inline... > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> On Apr 25, 2024, at 1:10 PM, Alieh Saeedi > > >>>>>>>>>>>>>>>>> <asae...@confluent.io.INVALID > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Hi all, > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Thanks a lot for the constructive feedbacks! > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Addressing some of the main concerns: > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> - The `RecordTooLargeException` can be thrown by > > >>> broker, > > >>>>>>>>>>>>>> producer > > >>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>> consumer. Of course, the `ProducerExceptionHandler` > > >>>>>>>> interface > > >>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>> introduced > > >>>>>>>>>>>>>>>>>>>> to affect only the exceptions thrown from the > > >>> producer. > > >>>>>>>> This > > >>>>>>>>>>>>>> KIP > > >>>>>>>>>>>>>>>> very > > >>>>>>>>>>>>>>>>>>>> specifically means to provide a possibility to > > >> manage > > >>>> the > > >>>>>>>>>>>>>>>>>>>> `RecordTooLargeException` thrown from the > > >>>> Producer.send() > > >>>>>>>>>>>>>> method. > > >>>>>>>>>>>>>>>>>> Please > > >>>>>>>>>>>>>>>>>>>> see “Proposed Changes” section for more clarity. I > > >>>>>>>>>>>>> investigated > > >>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> issue > > >>>>>>>>>>>>>>>>>>>> there thoroughly. I hope it can explain the concern > > >>>> about > > >>>>>>>> how > > >>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>> handle > > >>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> errors as well. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> - The problem with Callback: Methods of Callback are > > >>>>> called > > >>>>>>>>>>>>>> when > > >>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> record > > >>>>>>>>>>>>>>>>>>>> sent to the server is acknowledged, while this is > > >> not > > >>>> the > > >>>>>>>>>>>>>> desired > > >>>>>>>>>>>>>>>>> time > > >>>>>>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>>>> all exceptions. We intend to handle exceptions > > >>>> beforehand. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> I guess it makes sense to keep the expectation for > > >> when > > >>>>>>>>>>>>> Callback > > >>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>> invoked as-is vs. shoehorning more into it. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> - What if the custom handler returns RETRY for > > >>>>>>>>>>>>>>>>>>> `RecordTooLargeException`? I > > >>>>>>>>>>>>>>>>>>>> assume changing the producer configuration at > > >> runtime > > >>> is > > >>>>>>>>>>>>>>> possible. > > >>>>>>>>>>>>>>>> If > > >>>>>>>>>>>>>>>>>> so, > > >>>>>>>>>>>>>>>>>>>> RETRY for a too large record is valid because maybe > > >> in > > >>>> the > > >>>>>>>>>>>>> next > > >>>>>>>>>>>>>>>> try, > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> too large record is not poisoning any more. I am not > > >>>> 100% > > >>>>>>>>>>>>> sure > > >>>>>>>>>>>>>>>> about > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> technical details, though. Otherwise, we can > > >> consider > > >>>> the > > >>>>>>>>>>>>> RETRY > > >>>>>>>>>>>>>>> as > > >>>>>>>>>>>>>>>>> FAIL > > >>>>>>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>>>> this exception. Another solution would be to > > >> consider > > >>> a > > >>>>>>>>>>>>>> constant > > >>>>>>>>>>>>>>>>> number > > >>>>>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>>>> times for RETRY which can be useful for other > > >>> exceptions > > >>>>> as > > >>>>>>>>>>>>>> well. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> It’s not presently possible to change the > > >> configuration > > >>>> of > > >>>>>>>> an > > >>>>>>>>>>>>>>>> existing > > >>>>>>>>>>>>>>>>>>> Producer at runtime. So if a record hits a > > >>>>>>>>>>>>>> RecordTooLargeException > > >>>>>>>>>>>>>>>>> once, > > >>>>>>>>>>>>>>>>>> no > > >>>>>>>>>>>>>>>>>>> amount of retrying (with the current Producer) will > > >>>> change > > >>>>>>>>> that > > >>>>>>>>>>>>>>> fact. > > >>>>>>>>>>>>>>>>> So > > >>>>>>>>>>>>>>>>>>> I’m still a little stuck on how to handle a response > > >> of > > >>>>>>>> RETRY > > >>>>>>>>>>>>> for > > >>>>>>>>>>>>>>> an > > >>>>>>>>>>>>>>>>>>> “oversized” record. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> - What if the handle() method itself throws an > > >>>> exception? > > >>>>> I > > >>>>>>>>>>>>>> think > > >>>>>>>>>>>>>>>>>>>> rationally and pragmatically, the behaviour must be > > >>>>> exactly > > >>>>>>>>>>>>>> like > > >>>>>>>>>>>>>>>> when > > >>>>>>>>>>>>>>>>>> no > > >>>>>>>>>>>>>>>>>>>> custom handler is defined since the user actually > > >> did > > >>>> not > > >>>>>>>>>>>>> have > > >>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>> working > > >>>>>>>>>>>>>>>>>>>> handler. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> I’m not convinced that ignoring an errant handler is > > >>> the > > >>>>>>>> right > > >>>>>>>>>>>>>>>> choice. > > >>>>>>>>>>>>>>>>> It > > >>>>>>>>>>>>>>>>>>> then becomes a silent failure that might have > > >>>>> repercussions, > > >>>>>>>>>>>>>>>> depending > > >>>>>>>>>>>>>>>>> on > > >>>>>>>>>>>>>>>>>>> the business logic. A user would have to proactively > > >>>> trawls > > >>>>>>>>>>>>>> through > > >>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> logs for WARN/ERROR messages to catch it. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Throwing a hard error is pretty draconian, though… > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> - Why not use config parameters instead of an > > >>> interface? > > >>>>> As > > >>>>>>>>>>>>>>>> explained > > >>>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>> the “Rejected Alternatives” section, we assume that > > >>> the > > >>>>>>>>>>>>> handler > > >>>>>>>>>>>>>>>> will > > >>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>> used for a greater number of exceptions in the > > >> future. > > >>>>>>>>>>>>>> Defining a > > >>>>>>>>>>>>>>>>>>>> configuration parameter for each exception may make > > >>> the > > >>>>>>>>>>>>>>>>> configuration a > > >>>>>>>>>>>>>>>>>>> bit > > >>>>>>>>>>>>>>>>>>>> messy. Moreover, the handler offers more > > >> flexibility. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Agreed that the logic-via-configuration approach is > > >>> weird > > >>>>>>>> and > > >>>>>>>>>>>>>>>> limiting. > > >>>>>>>>>>>>>>>>>>> Forget I ever suggested it ;) > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> I’d think additional background in the Motivation > > >>> section > > >>>>>>>>> would > > >>>>>>>>>>>>>>> help > > >>>>>>>>>>>>>>>> me > > >>>>>>>>>>>>>>>>>>> understand how users might use this feature beyond a) > > >>>>>>>> skipping > > >>>>>>>>>>>>>>>>>> “oversized” > > >>>>>>>>>>>>>>>>>>> records, and b) not retrying missing topics. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Small change: > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> -ProductionExceptionHandlerResponse -> Response for > > >>>>> brevity > > >>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>> simplicity. > > >>>>>>>>>>>>>>>>>>>> Could’ve been HandlerResponse too I think! > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> The name change sounds good to me. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Thanks Alieh! > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> I thank you all again for your useful > > >>>>>>>> questions/suggestions. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> I would be happy to hear more of your concerns, as > > >>>> stated > > >>>>>>>> in > > >>>>>>>>>>>>>> some > > >>>>>>>>>>>>>>>>>>> feedback. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>>>>>>>>> Alieh > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan > > >>>>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote: > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Thanks Alieh for the updates. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> I'm a little concerned about the design pattern > > >> here. > > >>>> It > > >>>>>>>>>>>>> seems > > >>>>>>>>>>>>>>>> like > > >>>>>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>>>> want > > >>>>>>>>>>>>>>>>>>>>> specific usages, but we are packaging it as a > > >> generic > > >>>>>>>>>>>>> handler. > > >>>>>>>>>>>>>>>>>>>>> I think we tried to narrow down on the specific > > >>> errors > > >>>> we > > >>>>>>>>>>>>> want > > >>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>> handle, > > >>>>>>>>>>>>>>>>>>>>> but it feels a little clunky as we have a generic > > >>> thing > > >>>>>>>> for > > >>>>>>>>>>>>>> two > > >>>>>>>>>>>>>>>>>> specific > > >>>>>>>>>>>>>>>>>>>>> errors. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> I'm wondering if we are using the right patterns to > > >>>> solve > > >>>>>>>>>>>>>> these > > >>>>>>>>>>>>>>>>>>> problems. I > > >>>>>>>>>>>>>>>>>>>>> agree though that we will need something more than > > >>> the > > >>>>>>>> error > > >>>>>>>>>>>>>>>> classes > > >>>>>>>>>>>>>>>>>> I'm > > >>>>>>>>>>>>>>>>>>>>> proposing if we want to have different handling be > > >>>>>>>>>>>>>> configurable. > > >>>>>>>>>>>>>>>>>>>>> My concern is that the open-endedness of a handler > > >>>> means > > >>>>>>>>>>>>> that > > >>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>>> are > > >>>>>>>>>>>>>>>>>>>>> creating more problems than we are solving. It is > > >>> still > > >>>>>>>>>>>>>> unclear > > >>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>> me > > >>>>>>>>>>>>>>>>>>> how > > >>>>>>>>>>>>>>>>>>>>> we expect to handle the errors. Perhaps we could > > >>>> include > > >>>>>>>> an > > >>>>>>>>>>>>>>>> example? > > >>>>>>>>>>>>>>>>>> It > > >>>>>>>>>>>>>>>>>>>>> seems like there is a specific use case in mind and > > >>>> maybe > > >>>>>>>> we > > >>>>>>>>>>>>>> can > > > > > > >