Hi Alieh,

Thank you for all the updates! One final question--how will the retry
timeout for unknown topic partition errors be implemented? I think it would
be best if this could be done with an implementation of the error handler,
but I don't see a way to track the necessary information with the
current ProducerExceptionHandler interface.

Cheers,

Chris

On Tue, May 14, 2024 at 9:10 AM Alieh Saeedi <asae...@confluent.io.invalid>
wrote:

> Thanks Andrew. Done :)
>
> @Chris: I changed the config parameter type from boolean to integer, which
> defines the timeout for retrying. I thought reusing `max.block.ms` was not
> reasonable as you mentioned.
>
> So if the KIP looks good, let 's skip to the good part ;-) VOTING :)
>
> Bests,
> Alieh
>
>
>
>
>
> On Tue, May 14, 2024 at 12:26 PM Andrew Schofield <
> andrew_schofi...@live.com>
> wrote:
>
> > Hi Alieh,
> > Just one final comment.
> >
> > [AJS5] Existing classes use Retriable, not Retryable. For example:
> >
> >
> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/errors/RetriableException.html
> >
> > I suggest RetriableResponse and NonRetriableResponse.
> >
> > Thanks,
> > Andrew
> >
> > > On 13 May 2024, at 23:17, Alieh Saeedi <asae...@confluent.io.INVALID>
> > wrote:
> > >
> > > Hi all,
> > >
> > >
> > > Thanks for all the valid points you listed.
> > >
> > >
> > > KIP updates and addressing concerns:
> > >
> > >
> > > 1) The KIP now suggests two Response types: `RetryableResponse` and
> > > `NonRetryableResponse`
> > >
> > >
> > > 2) `custom.exception.handler` is changed to
> > `custom.exception.handler.class`
> > >
> > >
> > > 3) The KIP clarifies that `In the case of an implemented handler for
> the
> > > specified exception, the handler takes precedence.`
> > >
> > >
> > > 4)  There is now a `default` implementation for both handle() methods.
> > >
> > >
> > > 5)  @Chris: for `UnknownTopicOrPartition`, the default is already
> > retrying
> > > for 60s. (In fact, the default value of `max.block.ms`). If the
> handler
> > > instructs to FAIL or SWALLOW, there will be no retry, and if the
> handler
> > > instructs to RETRY, that will be the default behavior, which follows
> the
> > > values in already existing config parameters such as `max.block.ms`.
> > Does
> > > that make sense?
> > >
> > >
> > > Hope the changes and explanations are convincing :)
> > >
> > >
> > > Cheers,
> > >
> > > Alieh
> > >
> > > On Mon, May 13, 2024 at 6:40 PM Justine Olshan
> > <jols...@confluent.io.invalid>
> > > wrote:
> > >
> > >> Oh I see. The type isn't the error type but a newly defined type for
> the
> > >> response. Makes sense and works for me.
> > >>
> > >> Justine
> > >>
> > >> On Mon, May 13, 2024 at 9:13 AM Chris Egerton <
> fearthecel...@gmail.com>
> > >> wrote:
> > >>
> > >>> If we have dedicated methods for each kind of exception
> > >>> (handleRecordTooLarge, handleUnknownTopicOrPartition, etc.), doesn't
> > that
> > >>> provide sufficient constraint? I'm not suggesting we eliminate these
> > >>> methods, just that we change their return types to something more
> > >> flexible.
> > >>>
> > >>> On Mon, May 13, 2024, 12:07 Justine Olshan
> > <jols...@confluent.io.invalid
> > >>>
> > >>> wrote:
> > >>>
> > >>>> I'm not sure I agree with the Retriable and NonRetriableResponse
> > >> comment.
> > >>>> This doesn't limit the blast radius or enforce certain errors are
> > used.
> > >>>> I think we might disagree on how controlled these interfaces can
> be...
> > >>>>
> > >>>> Justine
> > >>>>
> > >>>> On Mon, May 13, 2024 at 8:40 AM Chris Egerton
> <chr...@aiven.io.invalid
> > >>>
> > >>>> wrote:
> > >>>>
> > >>>>> Hi Alieh,
> > >>>>>
> > >>>>> Thanks for the updates! I just have a few more thoughts:
> > >>>>>
> > >>>>> - I don't think a boolean property is sufficient to dictate retries
> > >> for
> > >>>>> unknown topic partitions, though. These errors can occur if a topic
> > >> has
> > >>>>> just been created, which can occur if, for example, automatic topic
> > >>>>> creation is enabled for a multi-task connector. This is why I
> > >> proposed
> > >>> a
> > >>>>> timeout instead of a boolean (and see my previous email for why
> > >>> reducing
> > >>>>> max.block.ms for a producer is not a viable alternative). If it
> > >> helps,
> > >>>> one
> > >>>>> way to reproduce this yourself is to add the line
> > >>>>> `fooProps.put(TASKS_MAX_CONFIG, "10");` to the integration test
> here:
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> https://github.com/apache/kafka/blob/5439914c32fa00d634efa7219699f1bc21add839/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java#L134
> > >>>>> and then check the logs afterward for messages like "Error while
> > >>> fetching
> > >>>>> metadata with correlation id <n> :
> > >>>> {foo-topic=UNKNOWN_TOPIC_OR_PARTITION}".
> > >>>>>
> > >>>>> - I also don't think we need custom XxxResponse enums for every
> > >>> possible
> > >>>>> method; it seems like this will lead to a lot of duplication and
> > >>>> cognitive
> > >>>>> overhead if we want to expand the error handler in the future.
> > >>> Something
> > >>>>> more flexible like RetriableResponse and NonRetriableResponse could
> > >>>>> suffice.
> > >>>>>
> > >>>>> - Finally, the KIP still doesn't state how the handler will or
> won't
> > >>> take
> > >>>>> precedence over existing retry properties. If I set `retries` or `
> > >>>>> delivery.timeout.ms` or `max.block.ms` to low values, will that
> > >> cause
> > >>>>> retries to cease even if my custom handler would otherwise keep
> > >>> returning
> > >>>>> RETRY for an error?
> > >>>>>
> > >>>>> Cheers,
> > >>>>>
> > >>>>> Chris
> > >>>>>
> > >>>>> On Mon, May 13, 2024 at 11:02 AM Andrew Schofield <
> > >>>>> andrew_schofi...@live.com>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Hi Alieh,
> > >>>>>> Just a few more comments on the KIP. It is looking much less risky
> > >>> now
> > >>>>> the
> > >>>>>> scope
> > >>>>>> is tighter.
> > >>>>>>
> > >>>>>> [AJS1] It would be nice to have default implementations of the
> > >> handle
> > >>>>>> methods
> > >>>>>> so an implementor would not need to implement both themselves.
> > >>>>>>
> > >>>>>> [AJS2] Producer configurations which are class names usually end
> in
> > >>>>>> “.class”.
> > >>>>>> I suggest “custom.exception.handler.class”.
> > >>>>>>
> > >>>>>> [AJS3] If I implemented a handler, and I set a non-default value
> > >> for
> > >>>> one
> > >>>>>> of the
> > >>>>>> new configuations, what happens? I would expect that the handler
> > >>> takes
> > >>>>>> precedence. I wasn’t quite clear what “the control will follow the
> > >>>>> handler
> > >>>>>> instructions” meant.
> > >>>>>>
> > >>>>>> [AJS4] Because you now have an enum for the
> > >>>>>> RecordTooLargeExceptionResponse,
> > >>>>>> I don’t think you need to state in the comment for
> > >>>>>> ProducerExceptionHandler that
> > >>>>>> RETRY will be interpreted as FAIL.
> > >>>>>>
> > >>>>>> Thanks,
> > >>>>>> Andrew
> > >>>>>>
> > >>>>>>> On 13 May 2024, at 14:53, Alieh Saeedi
> > >>> <asae...@confluent.io.INVALID
> > >>>>>
> > >>>>>> wrote:
> > >>>>>>>
> > >>>>>>> Hi all,
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Thanks for the very interesting discussion during my PTO.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> KIP updates and addressing concerns:
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> 1) Two handle() methods are defined in ProducerExceptionHandler
> > >> for
> > >>>> the
> > >>>>>> two
> > >>>>>>> exceptions with different input parameters so that we have
> > >>>>>>> handle(RecordTooLargeException e, ProducerRecord record) and
> > >>>>>>> handle(UnknownTopicOrPartitionException e, ProducerRecord record)
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> 2) The ProducerExceptionHandler extends `Closable` as well.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> 3) The KIP suggests having two more configuration parameters with
> > >>>>> boolean
> > >>>>>>> values:
> > >>>>>>>
> > >>>>>>> - `drop.invalid.large.records` with a default value of `false`
> > >> for
> > >>>>>>> swallowing too large records.
> > >>>>>>>
> > >>>>>>> - `retry.unknown.topic.partition` with a default value of `true`
> > >>> that
> > >>>>>>> performs RETRY for `max.block.ms` ms, encountering the
> > >>>>>>> UnknownTopicOrPartitionException.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Hope the main concerns are addressed so that we can go forward
> > >> with
> > >>>>>> voting.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Cheers,
> > >>>>>>>
> > >>>>>>> Alieh
> > >>>>>>>
> > >>>>>>> On Thu, May 9, 2024 at 11:25 PM Artem Livshits
> > >>>>>>> <alivsh...@confluent.io.invalid> wrote:
> > >>>>>>>
> > >>>>>>>> Hi Mathias,
> > >>>>>>>>
> > >>>>>>>>> [AL1] While I see the point, I would think having a different
> > >>>>> callback
> > >>>>>>>> for every exception might not really be elegant?
> > >>>>>>>>
> > >>>>>>>> I'm not sure how to assess the level of elegance of the
> > >> proposal,
> > >>>> but
> > >>>>> I
> > >>>>>> can
> > >>>>>>>> comment on the technical characteristics:
> > >>>>>>>>
> > >>>>>>>> 1. Having specific interfaces that codify the logic that is
> > >>>> currently
> > >>>>>>>> prescribed in the comments reduce the chance of making a
> > >> mistake.
> > >>>>>>>> Commments may get ignored, misuderstood or etc. but if the
> > >>> contract
> > >>>> is
> > >>>>>>>> codified, the compilier will help to enforce the contract.
> > >>>>>>>> 2. Given that the logic is trickier than it seems (the
> > >>>>> record-too-large
> > >>>>>> is
> > >>>>>>>> an example that can easily confuse someone who's not intimately
> > >>>>> familiar
> > >>>>>>>> with the nuances of the batching logic), having a little more
> > >>> hoops
> > >>>> to
> > >>>>>> jump
> > >>>>>>>> would give a greater chance that whoever tries to add a new
> > >> cases
> > >>>>> pauses
> > >>>>>>>> and thinks a bit more.
> > >>>>>>>> 3. As Justine pointed out, having different method will be a
> > >>> forcing
> > >>>>>>>> function to go through a KIP rather than smuggle new cases
> > >> through
> > >>>>>>>> implementation.
> > >>>>>>>> 4. Sort of a consequence of the previous 3 -- all those things
> > >>>> reduce
> > >>>>>> the
> > >>>>>>>> chance of someone writing the code that works with 2 errors and
> > >>> then
> > >>>>>> when
> > >>>>>>>> more errors are added in the future will suddenly incorrectly
> > >>> ignore
> > >>>>> new
> > >>>>>>>> errors (the example I gave in the previous email).
> > >>>>>>>>
> > >>>>>>>>> [AL2 cont.] Similar to AL1, I see such a handler to some extend
> > >>> as
> > >>>>>>>> business logic. If a user puts a bad filter condition in their
> > >> KS
> > >>>> app,
> > >>>>>> and
> > >>>>>>>> drops messages
> > >>>>>>>>
> > >>>>>>>> I agree that there is always a chance to get a bug and lose
> > >>>> messages,
> > >>>>>> but
> > >>>>>>>> there are generally separation of concerns that has different
> > >> risk
> > >>>>>> profile:
> > >>>>>>>> the filtering logic may be more rigorously tested and rarely
> > >>> changed
> > >>>>>> (say
> > >>>>>>>> an application developer does it), but setting the topics to
> > >>> produce
> > >>>>>> may be
> > >>>>>>>> done via configuration (e.g. a user of the application does it)
> > >>> and
> > >>>>> it's
> > >>>>>>>> generally an expectation that users would get an error when
> > >>>>>> configuration
> > >>>>>>>> is incorrect.
> > >>>>>>>>
> > >>>>>>>> What could be worse is that UnknownTopicOrPartitionException can
> > >>> be
> > >>>> an
> > >>>>>>>> intermittent error, i.e. with a generally correct configuration,
> > >>>> there
> > >>>>>>>> could be metadata propagation problem on the cluster and then a
> > >>>> random
> > >>>>>> set
> > >>>>>>>> of records could get lost.
> > >>>>>>>>
> > >>>>>>>>> [AL3] Maybe I misunderstand what you are saying, but to me,
> > >>>> checking
> > >>>>>> the
> > >>>>>>>> size of the record upfront is exactly what the KIP proposes? No?
> > >>>>>>>>
> > >>>>>>>> It achieves the same result but solves it differently, my
> > >>> proposal:
> > >>>>>>>>
> > >>>>>>>> 1. Application checks the validity of a record (maybe via a new
> > >>>>>>>> validateRecord method) before producing it, and can just exclude
> > >>> it
> > >>>> or
> > >>>>>>>> return an error to the user.
> > >>>>>>>> 2. Application produces the record -- at this point there are no
> > >>>>> records
> > >>>>>>>> that could return record too large, they were either skipped at
> > >>>> step 1
> > >>>>>> or
> > >>>>>>>> we didn't get here because step 1 failed.
> > >>>>>>>>
> > >>>>>>>> Vs. KIP's proposal
> > >>>>>>>>
> > >>>>>>>> 1. Application produces the record.
> > >>>>>>>> 2. Application gets a callback.
> > >>>>>>>> 3. Application returns the action on how to proceed.
> > >>>>>>>>
> > >>>>>>>> The advantage of the former is the clarity of semantics -- the
> > >>>> record
> > >>>>> is
> > >>>>>>>> invalid (property of the record, not a function of server state
> > >> or
> > >>>>>> server
> > >>>>>>>> configuration) and we can clearly know that it is the record
> > >> that
> > >>> is
> > >>>>> bad
> > >>>>>>>> and can never succeed.
> > >>>>>>>>
> > >>>>>>>> The KIP-proposed way actually has a very tricky point: it
> > >> actually
> > >>>>>> handles
> > >>>>>>>> a subset of record-too-large exceptions.  The broker can return
> > >>>>>>>> record-too-large and reject the whole batch (but we don't want
> > >> to
> > >>>>> ignore
> > >>>>>>>> those because then we can skip random records that just happened
> > >>> to
> > >>>> be
> > >>>>>> in
> > >>>>>>>> the same batch), in some sense we use the same error for 2
> > >>> different
> > >>>>>>>> conditions and understanding that requires pretty deep
> > >>> understanding
> > >>>>> of
> > >>>>>>>> Kafka internals.
> > >>>>>>>>
> > >>>>>>>> -Artem
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On Wed, May 8, 2024 at 9:47 AM Justine Olshan
> > >>>>>> <jols...@confluent.io.invalid
> > >>>>>>>>>
> > >>>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> My concern with respect to it being fragile: the code that
> > >>> ensures
> > >>>>> the
> > >>>>>>>>> error type is internal to the producer. Someone may see it and
> > >>>> say, I
> > >>>>>>>> want
> > >>>>>>>>> to add such and such error. This looks like internal code, so I
> > >>>> don't
> > >>>>>>>> need
> > >>>>>>>>> a KIP, and then they can change it to whatever they want
> > >> thinking
> > >>>> it
> > >>>>> is
> > >>>>>>>>> within the typical kafka improvement protocol.
> > >>>>>>>>>
> > >>>>>>>>> Relying on an internal change to enforce an external API is
> > >>> fragile
> > >>>>> in
> > >>>>>> my
> > >>>>>>>>> opinion. That's why I sort of agreed with Artem with enforcing
> > >>> the
> > >>>>>> error
> > >>>>>>>> in
> > >>>>>>>>> the method signature -- part of the public API.
> > >>>>>>>>>
> > >>>>>>>>> Chris's comments on requiring more information to handler again
> > >>>> makes
> > >>>>>> me
> > >>>>>>>>> wonder if we are solving a problem of lack of information at
> > >> the
> > >>>>>>>>> application level with a more powerful solution than we need.
> > >>> (Ie,
> > >>>> if
> > >>>>>> we
> > >>>>>>>>> had more information, could the application close and restart
> > >> the
> > >>>>>>>>> transaction rather than having to drop records) But I am happy
> > >> to
> > >>>>>>>>> compromise with a handler that we can agree is sufficiently
> > >>>>> controlled
> > >>>>>>>> and
> > >>>>>>>>> documented.
> > >>>>>>>>>
> > >>>>>>>>> Justine
> > >>>>>>>>>
> > >>>>>>>>> On Wed, May 8, 2024 at 7:20 AM Chris Egerton
> > >>>> <chr...@aiven.io.invalid
> > >>>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>
> > >>>>>>>>>> Continuing prior discussions:
> > >>>>>>>>>>
> > >>>>>>>>>> 1) Regarding the "flexibility" discussion, my overarching
> > >> point
> > >>> is
> > >>>>>>>> that I
> > >>>>>>>>>> don't see the point in allowing for this kind of pluggable
> > >> logic
> > >>>>>>>> without
> > >>>>>>>>>> also covering more scenarios. Take example 2 in the KIP: if
> > >>> we're
> > >>>>>> going
> > >>>>>>>>> to
> > >>>>>>>>>> implement retries only on "important" topics when a topic
> > >>>> partition
> > >>>>>>>> isn't
> > >>>>>>>>>> found, why wouldn't we also want to be able to do this for
> > >> other
> > >>>>>>>> errors?
> > >>>>>>>>>> Again, taking authorization errors as an example, why wouldn't
> > >>> we
> > >>>>> want
> > >>>>>>>> to
> > >>>>>>>>>> be able to fail when we can't write to "important" topics
> > >>> because
> > >>>>> the
> > >>>>>>>>>> producer principal lacks sufficient ACLs, and drop the record
> > >> if
> > >>>> the
> > >>>>>>>>> topic
> > >>>>>>>>>> isn't "important"? In a security-conscious environment with
> > >>>>>>>>>> runtime-dependent topic routing (which is a common feature of
> > >>> many
> > >>>>>>>> source
> > >>>>>>>>>> connectors, such as the Debezium connectors), this seems
> > >> fairly
> > >>>>>> likely.
> > >>>>>>>>>>
> > >>>>>>>>>> 2) As far as changing the shape of the API goes, I like
> > >> Artem's
> > >>>> idea
> > >>>>>> of
> > >>>>>>>>>> splitting out the interface based on specific exceptions. This
> > >>> may
> > >>>>> be
> > >>>>>> a
> > >>>>>>>>>> little laborious to expand in the future, but if we really
> > >> want
> > >>> to
> > >>>>>>>>>> limit the exceptions that we cover with the handler and move
> > >>>> slowly
> > >>>>>> and
> > >>>>>>>>>> cautiously, then IMO it'd be reasonable to reflect that in the
> > >>>>>>>>> interface. I
> > >>>>>>>>>> also acknowledge that there's no way to completely prevent
> > >>> people
> > >>>>> from
> > >>>>>>>>>> shooting themselves in the foot by implementing the API
> > >>>> incorrectly,
> > >>>>>>>> but
> > >>>>>>>>> I
> > >>>>>>>>>> think it's worth it to do what we can--including leveraging
> > >> the
> > >>>> Java
> > >>>>>>>>>> language's type system--to help them, so IMO there's value to
> > >>>>>>>> eliminating
> > >>>>>>>>>> the implicit behavior of failing when a policy returns RETRY
> > >>> for a
> > >>>>>>>>>> non-retriable error. This can take a variety of shapes and I'm
> > >>> not
> > >>>>>>>> going
> > >>>>>>>>> to
> > >>>>>>>>>> insist on anything specific, but I do want to again raise my
> > >>>>> concerns
> > >>>>>>>>> with
> > >>>>>>>>>> the current proposal and request that we find something a
> > >> little
> > >>>>>>>> better.
> > >>>>>>>>>>
> > >>>>>>>>>> 3) Concerning the default implementation--actually, I meant
> > >>> what I
> > >>>>>>>> wrote
> > >>>>>>>>> :)
> > >>>>>>>>>> I don't want a "second" default, I want an implementation of
> > >>> this
> > >>>>>>>>> interface
> > >>>>>>>>>> to be used as the default if no others are specified. The
> > >>> behavior
> > >>>>> of
> > >>>>>>>>> this
> > >>>>>>>>>> default implementation would be identical to existing behavior
> > >>> (so
> > >>>>>>>> there
> > >>>>>>>>>> would be no backwards compatibility concerns like the ones
> > >>> raised
> > >>>> by
> > >>>>>>>>>> Matthias), but it would be possible to configure this default
> > >>>>> handler
> > >>>>>>>>> class
> > >>>>>>>>>> to behave differently for a basic set of scenarios. This would
> > >>>>> mirror
> > >>>>>>>>> (pun
> > >>>>>>>>>> intended) the approach we've taken with Mirror Maker 2 and its
> > >>>>>>>>>> ReplicationPolicy interface [1]. There is a default
> > >>> implementation
> > >>>>>>>>>> available [2] that recognizes a handful of basic configuration
> > >>>>>>>> properties
> > >>>>>>>>>> [3] for simple tweaks, but if users want, they can also
> > >>> implement
> > >>>>>> their
> > >>>>>>>>> own
> > >>>>>>>>>> replication policy for more fine-grained logic if those
> > >>> properties
> > >>>>>>>> aren't
> > >>>>>>>>>> flexible enough.
> > >>>>>>>>>>
> > >>>>>>>>>> More concretely, I'm imagining something like this for the
> > >>>> producer
> > >>>>>>>>>> exception handler:
> > >>>>>>>>>>
> > >>>>>>>>>> - Default implementation class
> > >>>>>>>>>> of
> > >>>> org.apache.kafka.clients.producer.DefaultProducerExceptionHandler
> > >>>>>>>>>> - This class would recognize two properties:
> > >>>>>>>>>> - drop.invalid.large.records: Boolean property, defaults to
> > >>>> false.
> > >>>>> If
> > >>>>>>>>>> "false", then causes the handler to return FAIL whenever
> > >>>>>>>>>> a RecordTooLargeException is encountered; if "true", then
> > >> causes
> > >>>>>>>>>> SWALLOW/SKIP/DROP to be returned instead
> > >>>>>>>>>> - unknown.topic.partition.retry.timeout.ms: Integer
> > >> property,
> > >>>>>>>> defaults
> > >>>>>>>>>> to
> > >>>>>>>>>> INT_MAX. Whenever an UnknownTopicOrPartitionException is
> > >>>>> encountered,
> > >>>>>>>>>> causes the handler to return FAIL if that record has been
> > >>> pending
> > >>>>> for
> > >>>>>>>>> more
> > >>>>>>>>>> than the retry timeout; otherwise, causes RETRY to be returned
> > >>>>>>>>>>
> > >>>>>>>>>> I think this is worth addressing now instead of later because
> > >> it
> > >>>>>> forces
> > >>>>>>>>> us
> > >>>>>>>>>> to evaluate the usefulness of this interface and it addresses
> > >> a
> > >>>>>>>>>> long-standing issue not just with Kafka Connect, but with the
> > >>> Java
> > >>>>>>>>> producer
> > >>>>>>>>>> in general. For reference, here are a few tickets I collected
> > >>>> after
> > >>>>>>>>> briefly
> > >>>>>>>>>> skimming our Jira showing that this is a real pain point for
> > >>>> users:
> > >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10340,
> > >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-12990,
> > >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-13634. Although
> > >>> this
> > >>>> is
> > >>>>>>>>>> frequently reported with Kafka Connect, it applies to anyone
> > >> who
> > >>>>>>>>> configures
> > >>>>>>>>>> a producer to use a high retry timeout. I am aware of the
> > >>>>>> max.block.ms
> > >>>>>>>>>> property, but it's painful and IMO poor behavior to require
> > >>> users
> > >>>> to
> > >>>>>>>>> reduce
> > >>>>>>>>>> the value of this property just to handle the single scenario
> > >>> when
> > >>>>>>>> trying
> > >>>>>>>>>> to write to topics that don't exist, since it would also limit
> > >>> the
> > >>>>>>>> retry
> > >>>>>>>>>> timeout for other operations that are legitimately retriable.
> > >>>>>>>>>>
> > >>>>>>>>>> Raising new points:
> > >>>>>>>>>>
> > >>>>>>>>>> 5) I don't see the interplay between this handler and existing
> > >>>>>>>>>> retry-related properties mentioned anywhere in the KIP. I'm
> > >>>> assuming
> > >>>>>>>> that
> > >>>>>>>>>> properties like "retries", "max.block.ms", and "
> > >>>> delivery.timeout.ms
> > >>>>> "
> > >>>>>>>>> would
> > >>>>>>>>>> take precedence over the handler and once they are exhausted,
> > >>> the
> > >>>>>>>>>> record/batch will fail no matter what? If so, it's probably
> > >>> worth
> > >>>>>>>> briefly
> > >>>>>>>>>> mentioning this (no more than a sentence or two) in the KIP,
> > >> and
> > >>>> if
> > >>>>>>>> not,
> > >>>>>>>>>> I'm curious what you have in mind.
> > >>>>>>>>>>
> > >>>>>>>>>> 6) I also wonder if the API provides enough information in its
> > >>>>> current
> > >>>>>>>>>> form. Would it be possible to provide handlers with some way
> > >> of
> > >>>>>>>> tracking
> > >>>>>>>>>> how long a record has been pending for (i.e., how long it's
> > >> been
> > >>>>> since
> > >>>>>>>>> the
> > >>>>>>>>>> record was provided as an argument to Producer::send)? One way
> > >>> to
> > >>>> do
> > >>>>>>>> this
> > >>>>>>>>>> could be to add a method like `onNewRecord(ProducerRecord)`
> > >> and
> > >>>>>>>>>> allow/require the handler to do its own bookkeeping, probably
> > >>>> with a
> > >>>>>>>>>> matching `onRecordSuccess(ProducerRecord)` method so that the
> > >>>>> handler
> > >>>>>>>>>> doesn't eat up an ever-increasing amount of memory trying to
> > >>> track
> > >>>>>>>>> records.
> > >>>>>>>>>> An alternative could be to include information about the
> > >> initial
> > >>>>> time
> > >>>>>>>> the
> > >>>>>>>>>> record was received by the producer and the number of retries
> > >>> that
> > >>>>>> have
> > >>>>>>>>>> been performed for it as parameters in the handle method(s),
> > >> but
> > >>>> I'm
> > >>>>>>>> not
> > >>>>>>>>>> sure how easy this would be to implement and it might clutter
> > >>>> things
> > >>>>>>>> up a
> > >>>>>>>>>> bit too much.
> > >>>>>>>>>>
> > >>>>>>>>>> 7) A small request--can we add Closeable (or, if you prefer,
> > >>>>>>>>> AutoCloseable)
> > >>>>>>>>>> as a superinterface for the handler interface?
> > >>>>>>>>>>
> > >>>>>>>>>> [1] -
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/ReplicationPolicy.html
> > >>>>>>>>>> [2] -
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html
> > >>>>>>>>>> [3] -
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#SEPARATOR_CONFIG
> > >>>>>>>>>> ,
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG
> > >>>>>>>>>>
> > >>>>>>>>>> Cheers,
> > >>>>>>>>>>
> > >>>>>>>>>> Chris
> > >>>>>>>>>>
> > >>>>>>>>>> On Wed, May 8, 2024 at 12:37 AM Matthias J. Sax <
> > >>> mj...@apache.org
> > >>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> Very interesting discussion.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Seems a central point is the question about "how generic" we
> > >>>>> approach
> > >>>>>>>>>>> this, and some people think we need to be conservative and
> > >>> others
> > >>>>>>>> think
> > >>>>>>>>>>> we should try to be as generic as possible.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Personally, I think following a limited scope for this KIP
> > >> (by
> > >>>>>>>>>>> explicitly saying we only cover RecordTooLarge and
> > >>>>>>>>>>> UnknownTopicOrPartition) might be better. We have concrete
> > >>>> tickets
> > >>>>>>>> that
> > >>>>>>>>>>> we address, while for other exception (like authorization) we
> > >>>> don't
> > >>>>>>>>> know
> > >>>>>>>>>>> if people want to handle it to begin with. Boiling the ocean
> > >>>> might
> > >>>>>>>> not
> > >>>>>>>>>>> get us too far, and being somewhat pragmatic might help to
> > >> move
> > >>>>> this
> > >>>>>>>>> KIP
> > >>>>>>>>>>> forward. -- I also agree with Justin and Artem, that we want
> > >> to
> > >>>> be
> > >>>>>>>>>>> careful anyway to not allow users to break stuff too easily.
> > >>>>>>>>>>>
> > >>>>>>>>>>> As the same time, I agree that we should setup this change
> > >> in a
> > >>>>>>>> forward
> > >>>>>>>>>>> looking way, and thus having a single generic handler allows
> > >> us
> > >>>> to
> > >>>>>>>>> later
> > >>>>>>>>>>> extend the handler more easily. This should also simplify
> > >>> follow
> > >>>> up
> > >>>>>>>> KIP
> > >>>>>>>>>>> that might add new error cases (I actually mentioned one more
> > >>> to
> > >>>>>>>> Alieh
> > >>>>>>>>>>> already, but we both agreed that it might be best to exclude
> > >> it
> > >>>>> from
> > >>>>>>>>> the
> > >>>>>>>>>>> KIP right now, to make the 3.8 deadline. Doing a follow up
> > >> KIP
> > >>> is
> > >>>>> not
> > >>>>>>>>>>> the end of the world.)
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> @Chris:
> > >>>>>>>>>>>
> > >>>>>>>>>>> (2) This sounds fair to me, but not sure how "bad" it
> > >> actually
> > >>>>> would
> > >>>>>>>>> be?
> > >>>>>>>>>>> If the contract is clearly defined, it seems to be fine what
> > >>> the
> > >>>>> KIP
> > >>>>>>>>>>> proposes, and given that such a handler is an expert API, and
> > >>> we
> > >>>>> can
> > >>>>>>>>>>> provide "best practices" (cf my other comment below in
> > >> [AL1]),
> > >>>>> being
> > >>>>>>>> a
> > >>>>>>>>>>> little bit pragmatic sound fine to me.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Not sure if I understand Justin's argument on this question?
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> (3) About having a default impl or not. I am fine with adding
> > >>>> one,
> > >>>>>>>> even
> > >>>>>>>>>>> if I am not sure why Connect could not just add its own one
> > >> and
> > >>>>> plug
> > >>>>>>>> it
> > >>>>>>>>>>> in (and we would add corresponding configs for Connect, but
> > >> not
> > >>>> for
> > >>>>>>>> the
> > >>>>>>>>>>> producer itself)? For this case, we could also do this as a
> > >>>> follow
> > >>>>> up
> > >>>>>>>>>>> KIP, but happy to include it in this KIP to provide value to
> > >>>>> Connect
> > >>>>>>>>>>> right away (even if the value might not come right away if we
> > >>>> miss
> > >>>>>>>> the
> > >>>>>>>>>>> 3.8 deadline due to expanded KIP scope...) --  For KS, we
> > >> would
> > >>>> for
> > >>>>>>>>> sure
> > >>>>>>>>>>> plugin our own impl, and lock down the config such that users
> > >>>>> cannot
> > >>>>>>>>> set
> > >>>>>>>>>>> their own handler on the internal producer to begin with.
> > >> Might
> > >>>> be
> > >>>>>>>> good
> > >>>>>>>>>>> to elaborate why the producer should have a default? We might
> > >>>>>>>> actually
> > >>>>>>>>>>> want to add this to the KIP right away?
> > >>>>>>>>>>>
> > >>>>>>>>>>> The key for a default impl would be, to not change the
> > >> current
> > >>>>>>>>> behavior,
> > >>>>>>>>>>> and having no default seems to achieve this. For the two
> > >> cases
> > >>>> you
> > >>>>>>>>>>> mentioned, it's unclear to me what default value on "upper
> > >>> bound
> > >>>> on
> > >>>>>>>>>>> retires" for UnkownTopicOrPartitionException we should set?
> > >>> Seems
> > >>>>> it
> > >>>>>>>>>>> would need to be the same as `delivery.timeout.ms`? However,
> > >>> if
> > >>>>>>>> users
> > >>>>>>>>>>> have `delivery.timeout.ms` actually overwritten we would
> > >> need
> > >>> to
> > >>>>> set
> > >>>>>>>>>>> this config somewhat "dynamic"? Is this feasible? If we
> > >>>> hard-code 2
> > >>>>>>>>>>> minutes, it might not be backward compatible. I have the
> > >>>> impression
> > >>>>>>>> we
> > >>>>>>>>>>> might introduce some undesired coupling? -- For the "record
> > >> too
> > >>>>>>>> large"
> > >>>>>>>>>>> case, the config seems to be boolean and setting it to
> > >> `false`
> > >>> by
> > >>>>>>>>>>> default seems to provide backward compatibility.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> @Artem:
> > >>>>>>>>>>>
> > >>>>>>>>>>> [AL1] While I see the point, I would think having a different
> > >>>>>>>> callback
> > >>>>>>>>>>> for every exception might not really be elegant? In the end,
> > >>> the
> > >>>>>>>>> handler
> > >>>>>>>>>>> is an very advanced feature anyway, and if it's implemented
> > >> in
> > >>> a
> > >>>>> bad
> > >>>>>>>>>>> way, well, it's a user error -- we cannot protect users from
> > >>>>>>>>> everything.
> > >>>>>>>>>>> To me, a handler like this, is to some extend "business
> > >> logic"
> > >>>> and
> > >>>>>>>> if a
> > >>>>>>>>>>> user gets business logic wrong, it's hard to protect them. --
> > >>> We
> > >>>>>>>> would
> > >>>>>>>>>>> of course provide best practice guidance in the JaveDocs, and
> > >>>>> explain
> > >>>>>>>>>>> that a handler should have explicit `if` statements for stuff
> > >>> it
> > >>>>> want
> > >>>>>>>>> to
> > >>>>>>>>>>> handle, and only a single default which return FAIL.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> [AL2] Yes, but for KS we would retry at the application
> > >> layer.
> > >>>> Ie,
> > >>>>>>>> the
> > >>>>>>>>>>> TX is not completed, we clean up and setup out task from
> > >>> scratch,
> > >>>>> to
> > >>>>>>>>>>> ensure the pending transaction is completed before we resume.
> > >>> If
> > >>>>> the
> > >>>>>>>> TX
> > >>>>>>>>>>> was indeed aborted, we would retry from older offset and thus
> > >>>> just
> > >>>>>>>> hit
> > >>>>>>>>>>> the same error again and the loop begins again.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> [AL2 cont.] Similar to AL1, I see such a handler to some
> > >> extend
> > >>>> as
> > >>>>>>>>>>> business logic. If a user puts a bad filter condition in
> > >> their
> > >>> KS
> > >>>>>>>> app,
> > >>>>>>>>>>> and drops messages, it nothing we can do about it, and this
> > >>>> handler
> > >>>>>>>>>>> IMHO, has a similar purpose. This is also the line of
> > >> thinking
> > >>> I
> > >>>>>>>> apply
> > >>>>>>>>>>> to EOS, to address Justin's concern about "should we allow to
> > >>>> drop
> > >>>>>>>> for
> > >>>>>>>>>>> EOS", and my answer is "yes", because it's more business
> > >> logic
> > >>>> than
> > >>>>>>>>>>> actual error handling IMHO. And by default, we fail... So
> > >> users
> > >>>>>>>> opt-in
> > >>>>>>>>>>> to add business logic to drop records. It's an application
> > >>> level
> > >>>>>>>>>>> decision how to write the code.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> [AL3] Maybe I misunderstand what you are saying, but to me,
> > >>>>> checking
> > >>>>>>>>> the
> > >>>>>>>>>>> size of the record upfront is exactly what the KIP proposes?
> > >>> No?
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> @Justin:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> I saw the sample
> > >>>>>>>>>>>> code -- is it just an if statement checking for the error
> > >>> before
> > >>>>>>>> the
> > >>>>>>>>>>>> handler is invoked? That seems a bit fragile.
> > >>>>>>>>>>>
> > >>>>>>>>>>> What do you mean by fragile? Not sure if I see your point.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> -Matthias
> > >>>>>>>>>>>
> > >>>>>>>>>>> On 5/7/24 5:33 PM, Artem Livshits wrote:
> > >>>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Thanks for the KIP.  The motivation talks about very
> > >> specific
> > >>>>>>>> cases,
> > >>>>>>>>>> but
> > >>>>>>>>>>>> the interface is generic.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> [AL1]
> > >>>>>>>>>>>> If the interface evolves in the future I think we could have
> > >>> the
> > >>>>>>>>>>> following
> > >>>>>>>>>>>> confusion:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> 1. A user implemented SWALLOW action for both
> > >>>>>>>> RecordTooLargeException
> > >>>>>>>>>> and
> > >>>>>>>>>>>> UnknownTopicOrPartitionException.  For simpicity they just
> > >>>> return
> > >>>>>>>>>> SWALLOW
> > >>>>>>>>>>>> from the function, because it elegantly handles all known
> > >>> cases.
> > >>>>>>>>>>>> 2. The interface has evolved to support a new exception.
> > >>>>>>>>>>>> 3. The user has upgraded their Kafka client.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Now a new kind of error gets dropped on the floor without
> > >>> user's
> > >>>>>>>>>>> intention
> > >>>>>>>>>>>> and it would be super hard to detect and debug.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> To avoid the confusion, I think we should use handlers for
> > >>>>> specific
> > >>>>>>>>>>>> exceptions.  Then if a new exception is added it won't get
> > >>>>> silently
> > >>>>>>>>>>> swalled
> > >>>>>>>>>>>> because the user would need to add new functionality to
> > >> handle
> > >>>> it.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I also have some higher level comments:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> [AL2]
> > >>>>>>>>>>>>> it throws a TimeoutException, and the user can only blindly
> > >>>>> retry,
> > >>>>>>>>>> which
> > >>>>>>>>>>>> may result in an infinite retry loop
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> If the TimeoutException happens during transactional
> > >>> processing
> > >>>>>>>>>> (exactly
> > >>>>>>>>>>>> once is the desired sematnics), then the client should not
> > >>> retry
> > >>>>>>>> when
> > >>>>>>>>>> it
> > >>>>>>>>>>>> gets TimeoutException because without knowing the reason for
> > >>>>>>>>>>>> TimeoutExceptions, the client cannot know whether the
> > >> message
> > >>>> got
> > >>>>>>>>>>> actually
> > >>>>>>>>>>>> produced or not and retrying the message may result in
> > >>>>> duplicatees.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> The thrown TimeoutException "cuts" the connection to the
> > >>>>>>>> underlying
> > >>>>>>>>>> root
> > >>>>>>>>>>>> cause of missing metadata
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Maybe we should fix the error handling and return the proper
> > >>>>>>>>> underlying
> > >>>>>>>>>>>> message?  Then the application can properly handle the
> > >> message
> > >>>>>>>> based
> > >>>>>>>>> on
> > >>>>>>>>>>>> preferences.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> From the product perspective, it's not clear how safe it is
> > >> to
> > >>>>>>>>> blindly
> > >>>>>>>>>>>> ignore UnknownTopicOrPartitionException.  This could lead to
> > >>>>>>>>> situations
> > >>>>>>>>>>>> when a simple typo could lead to massive data loss (part of
> > >>> the
> > >>>>>>>> data
> > >>>>>>>>>>> would
> > >>>>>>>>>>>> effectively be produced to a "black hole" and the user may
> > >> not
> > >>>>>>>> notice
> > >>>>>>>>>> it
> > >>>>>>>>>>>> for a while).
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> In which situations would you recommend the user to "black
> > >>> hole"
> > >>>>>>>>>> messages
> > >>>>>>>>>>>> in case of misconfiguration?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> [AL3]
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> If the custom handler decides on SWALLOW for
> > >>>>>>>>> RecordTooLargeException,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Is it my understanding that this KIP proposes that
> > >>> functionality
> > >>>>>>>> that
> > >>>>>>>>>>> would
> > >>>>>>>>>>>> only be able to SWALLOW RecordTooLargeException that happen
> > >>>>> because
> > >>>>>>>>> the
> > >>>>>>>>>>>> producer cannot produce the record (if the broker rejects
> > >> the
> > >>>>>>>> batch,
> > >>>>>>>>>> the
> > >>>>>>>>>>>> error won't get to the handler, because we cannot know which
> > >>>> other
> > >>>>>>>>>>> records
> > >>>>>>>>>>>> get ignored).  In this case, why not just check the locally
> > >>>>>>>>> configured
> > >>>>>>>>>>> max
> > >>>>>>>>>>>> record size upfront and not produce the recrord in the first
> > >>>>> place?
> > >>>>>>>>>>> Maybe
> > >>>>>>>>>>>> we can expose a validation function from the producer that
> > >>> could
> > >>>>>>>>>> validate
> > >>>>>>>>>>>> the records locally, so we don't need to produce the record
> > >> in
> > >>>>>>>> order
> > >>>>>>>>> to
> > >>>>>>>>>>>> know that it's invalid.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> -Artem
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Tue, May 7, 2024 at 2:07 PM Justine Olshan
> > >>>>>>>>>>> <jols...@confluent.io.invalid>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Alieh and Chris,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Thanks for clarifying 1) but I saw the motivation. I guess
> > >> I
> > >>>> just
> > >>>>>>>>>> didn't
> > >>>>>>>>>>>>> understand how that would be ensured on the producer side.
> > >> I
> > >>>> saw
> > >>>>>>>> the
> > >>>>>>>>>>> sample
> > >>>>>>>>>>>>> code -- is it just an if statement checking for the error
> > >>>> before
> > >>>>>>>> the
> > >>>>>>>>>>>>> handler is invoked? That seems a bit fragile.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Can you clarify what you mean by `since the code does not
> > >>> reach
> > >>>>>>>> the
> > >>>>>>>>> KS
> > >>>>>>>>>>>>> interface and breaks somewhere in producer.` If we surfaced
> > >>>> this
> > >>>>>>>>> error
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>> the application in a better way would that also be a
> > >> solution
> > >>>> to
> > >>>>>>>> the
> > >>>>>>>>>>> issue?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Tue, May 7, 2024 at 1:55 PM Alieh Saeedi
> > >>>>>>>>>>> <asae...@confluent.io.invalid>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Hi,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Thank you, Chris and Justine, for the feedback.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> @Chris
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 1) Flexibility: it has two meanings. The first meaning is
> > >>> the
> > >>>>> one
> > >>>>>>>>> you
> > >>>>>>>>>>>>>> mentioned. We are going to cover more exceptions in the
> > >>>> future,
> > >>>>>>>> but
> > >>>>>>>>>> as
> > >>>>>>>>>>>>>> Justine mentioned, we must be very conservative about
> > >> adding
> > >>>>> more
> > >>>>>>>>>>>>>> exceptions. Additionally, flexibility mainly means that
> > >> the
> > >>>> user
> > >>>>>>>> is
> > >>>>>>>>>>> able
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>> develop their own code. As mentioned in the motivation
> > >>> section
> > >>>>>>>> and
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>> examples, sometimes the user decides on dropping a record
> > >>>> based
> > >>>>>>>> on
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>> topic, for example.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 2) Defining two separate methods for retriable and
> > >>>> non-retriable
> > >>>>>>>>>>>>>> exceptions: although the idea is brilliant, the user may
> > >>> still
> > >>>>>>>>> make a
> > >>>>>>>>>>>>>> mistake by implementing the wrong method and see a
> > >>>> non-expecting
> > >>>>>>>>>>>>> behaviour.
> > >>>>>>>>>>>>>> For example, he may implement handleRetriable() for
> > >>>>>>>>>>>>> RecordTooLargeException
> > >>>>>>>>>>>>>> and define SWALLOW for the exception, but in practice, he
> > >>> sees
> > >>>>> no
> > >>>>>>>>>>> change
> > >>>>>>>>>>>>> in
> > >>>>>>>>>>>>>> default behaviour since he implemented the wrong method. I
> > >>>> think
> > >>>>>>>> we
> > >>>>>>>>>> can
> > >>>>>>>>>>>>>> never reduce the user’s mistakes to 0.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 3) Default implementation for Handler: the default
> > >> behaviour
> > >>>> is
> > >>>>>>>>>> already
> > >>>>>>>>>>>>>> preserved with NO need of implementing any handler or
> > >>> setting
> > >>>>> the
> > >>>>>>>>>>>>>> corresponding config parameter `custom.exception.handler`.
> > >>>> What
> > >>>>>>>> you
> > >>>>>>>>>>> mean
> > >>>>>>>>>>>>> is
> > >>>>>>>>>>>>>> actually having a second default, which requires having
> > >> both
> > >>>>>>>>>> interface
> > >>>>>>>>>>>>> and
> > >>>>>>>>>>>>>> config parameters. About UnknownTopicOrPartitionException:
> > >>> the
> > >>>>>>>>>> producer
> > >>>>>>>>>>>>>> already offers the config parameter `max.block.ms` which
> > >>>>>>>>> determines
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>> duration of retrying. The main purpose of the user who
> > >> needs
> > >>>> the
> > >>>>>>>>>>> handler
> > >>>>>>>>>>>>> is
> > >>>>>>>>>>>>>> to get the root cause of TimeoutException and handle it in
> > >>> the
> > >>>>>>>> way
> > >>>>>>>>> he
> > >>>>>>>>>>>>>> intends. The KIP explains the necessity of it for KS
> > >> users.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 4) Naming issue: By SWALLOW, we meant actually swallow the
> > >>>>> error,
> > >>>>>>>>>> while
> > >>>>>>>>>>>>>> SKIP means skip the record, I think. If it makes sense for
> > >>>> more
> > >>>>>>>>> ppl,
> > >>>>>>>>>> I
> > >>>>>>>>>>>>> can
> > >>>>>>>>>>>>>> change it to SKIP
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> @Justine
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 1) was addressed by Chris.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 2 and 3) The problem is exactly what you mentioned.
> > >>> Currently,
> > >>>>>>>>> there
> > >>>>>>>>>> is
> > >>>>>>>>>>>>> no
> > >>>>>>>>>>>>>> way to handle these issues application-side. Even KS users
> > >>> who
> > >>>>>>>>>>> implement
> > >>>>>>>>>>>>> KS
> > >>>>>>>>>>>>>> ProductionExceptionHandler are not able to handle the
> > >>>> exceptions
> > >>>>>>>> as
> > >>>>>>>>>>> they
> > >>>>>>>>>>>>>> intend since the code does not reach the KS interface and
> > >>>> breaks
> > >>>>>>>>>>>>> somewhere
> > >>>>>>>>>>>>>> in producer.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On Tue, May 7, 2024 at 8:43 PM Chris Egerton <
> > >>>>>>>>>> fearthecel...@gmail.com>
> > >>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Hi Justine,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> The method signatures for the interface are indeed
> > >>>> open-ended,
> > >>>>>>>> but
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>> KIP
> > >>>>>>>>>>>>>>> states that its uses will be limited. See the motivation
> > >>>>>>>> section:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> We believe that the user should be able to develop
> > >> custom
> > >>>>>>>>> exception
> > >>>>>>>>>>>>>>> handlers for managing producer exceptions. On the other
> > >>> hand,
> > >>>>>>>> this
> > >>>>>>>>>>> will
> > >>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>> an expert-level API, and using that may result in strange
> > >>>>>>>>> behaviour
> > >>>>>>>>>> in
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> system, making it hard to find the root cause. Therefore,
> > >>> the
> > >>>>>>>>> custom
> > >>>>>>>>>>>>>>> handler is currently limited to handling
> > >>>>> RecordTooLargeException
> > >>>>>>>>> and
> > >>>>>>>>>>>>>>> UnknownTopicOrPartitionException.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Chris
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On Tue, May 7, 2024, 14:37 Justine Olshan
> > >>>>>>>>>>> <jols...@confluent.io.invalid
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> I was out for KSB and then was also sick. :(
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> To your point 1) Chris, I don't think it is limited to
> > >> two
> > >>>>>>>>> specific
> > >>>>>>>>>>>>>>>> scenarios, since the interface accepts a generic
> > >>> Exception e
> > >>>>>>>> and
> > >>>>>>>>>> can
> > >>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>> implemented to check if that e is an instanceof any
> > >>>> exception.
> > >>>>>>>> I
> > >>>>>>>>>>>>> didn't
> > >>>>>>>>>>>>>>> see
> > >>>>>>>>>>>>>>>> anywhere that specific errors are enforced. I'm a bit
> > >>>>> concerned
> > >>>>>>>>>> about
> > >>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>> actually. I'm concerned about the opened-endedness and
> > >> the
> > >>>>>>>>> contract
> > >>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>> with transactions. We are allowing the client to make
> > >>>>> decisions
> > >>>>>>>>>> that
> > >>>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>> somewhat invisible to the server. As an aside, can we
> > >>> build
> > >>>> in
> > >>>>>>>>> log
> > >>>>>>>>>>>>>>> messages
> > >>>>>>>>>>>>>>>> when the handler decides to skip etc a message. I'm
> > >> really
> > >>>>>>>>>> concerned
> > >>>>>>>>>>>>>>> about
> > >>>>>>>>>>>>>>>> messages being silently dropped.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> I do think Chris's point 2) about retriable vs non
> > >>> retriable
> > >>>>>>>>> errors
> > >>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>> fair. I'm a bit concerned about skipping a unknown topic
> > >>> or
> > >>>>>>>>>> partition
> > >>>>>>>>>>>>>>>> exception too early, as there are cases where it can be
> > >>>>>>>>> transient.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> I'm still a little bit wary of allowing dropping records
> > >>> as
> > >>>>>>>> part
> > >>>>>>>>> of
> > >>>>>>>>>>>>> EOS
> > >>>>>>>>>>>>>>>> generally as in many cases, these errors signify an
> > >> issue
> > >>>> with
> > >>>>>>>>> the
> > >>>>>>>>>>>>>>> original
> > >>>>>>>>>>>>>>>> data. I understand that streams and connect/mirror maker
> > >>> may
> > >>>>>>>> have
> > >>>>>>>>>>>>>> reasons
> > >>>>>>>>>>>>>>>> they want to progress past these messages, but wondering
> > >>> if
> > >>>>>>>> there
> > >>>>>>>>>> is
> > >>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>> way
> > >>>>>>>>>>>>>>>> that can be done application-side. I'm willing to accept
> > >>>> this
> > >>>>>>>>> sort
> > >>>>>>>>>> of
> > >>>>>>>>>>>>>>>> proposal if we can make it clear that this sort of thing
> > >>> is
> > >>>>>>>>>> happening
> > >>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>> we limit the blast radius for what we can do.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On Tue, May 7, 2024 at 9:55 AM Chris Egerton
> > >>>>>>>>>> <chr...@aiven.io.invalid
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Sorry for the delay, I've been out sick. I still have
> > >>> some
> > >>>>>>>>>> thoughts
> > >>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>> I'd like to see addressed before voting.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> 1) If flexibility is the motivation for a pluggable
> > >>>>> interface,
> > >>>>>>>>> why
> > >>>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>> only limiting the uses for this interface to two very
> > >>>>> specific
> > >>>>>>>>>>>>>>> scenarios?
> > >>>>>>>>>>>>>>>>> Why not also allow, e.g., authorization errors to be
> > >>>> handled
> > >>>>>>>> as
> > >>>>>>>>>>>>> well
> > >>>>>>>>>>>>>>>>> (allowing users to drop records destined for some
> > >>>> off-limits
> > >>>>>>>>>>>>> topics,
> > >>>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>>> retry for a limited duration in case there's a delay in
> > >>> the
> > >>>>>>>>>>>>>> propagation
> > >>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>> ACL updates)? It'd be nice to see some analysis of
> > >> other
> > >>>>>>>> errors
> > >>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>> could
> > >>>>>>>>>>>>>>>>> be handled with this new API, both to avoid the
> > >> follow-up
> > >>>>> work
> > >>>>>>>>> of
> > >>>>>>>>>>>>>>> another
> > >>>>>>>>>>>>>>>>> KIP to address them in the future, and to make sure
> > >> that
> > >>>>> we're
> > >>>>>>>>> not
> > >>>>>>>>>>>>>>>> painting
> > >>>>>>>>>>>>>>>>> ourselves into a corner with the current API in a way
> > >>> that
> > >>>>>>>> would
> > >>>>>>>>>>>>> make
> > >>>>>>>>>>>>>>>>> future modifications difficult.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> 2) Something feels a bit off with how retriable vs.
> > >>>>>>>>> non-retriable
> > >>>>>>>>>>>>>>> errors
> > >>>>>>>>>>>>>>>>> are handled with the interface. Why not introduce two
> > >>>>> separate
> > >>>>>>>>>>>>>> methods
> > >>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> handle each case separately? That way there's no
> > >>> ambiguity
> > >>>> or
> > >>>>>>>>>>>>>> implicit
> > >>>>>>>>>>>>>>>>> behavior when, e.g., attempting to retry on a
> > >>>>>>>>>>>>>> RecordTooLargeException.
> > >>>>>>>>>>>>>>>> This
> > >>>>>>>>>>>>>>>>> could be something like `NonRetriableResponse
> > >>>>>>>>>>>>> handle(ProducerRecord,
> > >>>>>>>>>>>>>>>>> Exception)` and `RetriableResponse
> > >>>>>>>>> handleRetriable(ProducerRecord,
> > >>>>>>>>>>>>>>>>> Exception)`, though the exact names and shape can
> > >>> obviously
> > >>>>> be
> > >>>>>>>>>>>>> toyed
> > >>>>>>>>>>>>>>>> with a
> > >>>>>>>>>>>>>>>>> bit.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> 3) Although the flexibility of a pluggable interface
> > >> may
> > >>>>>>>> benefit
> > >>>>>>>>>>>>> some
> > >>>>>>>>>>>>>>>>> users' custom producer applications and Kafka Streams
> > >>>>>>>>>> applications,
> > >>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>> comes at significant deployment cost for other
> > >>> low-/no-code
> > >>>>>>>>>>>>>>> environments,
> > >>>>>>>>>>>>>>>>> including but not limited to Kafka Connect and
> > >>> MirrorMaker
> > >>>> 2.
> > >>>>>>>>> Can
> > >>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>> add
> > >>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>> default implementation of the exception handler that
> > >>> allows
> > >>>>>>>> for
> > >>>>>>>>>>>>> some
> > >>>>>>>>>>>>>>>> simple
> > >>>>>>>>>>>>>>>>> behavior to be tweaked via configuration property? Two
> > >>>> things
> > >>>>>>>>> that
> > >>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>> nice to have would be A) an upper bound on the retry
> > >> time
> > >>>> for
> > >>>>>>>>>>>>>>>>> unknown-topic-partition exceptions and B) an option to
> > >>> drop
> > >>>>>>>>>> records
> > >>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>> are large enough to trigger a record-too-large
> > >> exception.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> 4) I'd still prefer to see "SKIP" or "DROP" instead of
> > >>> the
> > >>>>>>>>>> proposed
> > >>>>>>>>>>>>>>>>> "SWALLOW" option, which IMO is opaque and non-obvious,
> > >>>>>>>>> especially
> > >>>>>>>>>>>>>> when
> > >>>>>>>>>>>>>>>>> trying to guess the behavior for retriable errors.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Chris
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> On Fri, May 3, 2024 at 11:23 AM Alieh Saeedi
> > >>>>>>>>>>>>>>>> <asae...@confluent.io.invalid
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> A summary of the KIP and the discussions:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> The KIP introduces a handler interface for Producer in
> > >>>> order
> > >>>>>>>> to
> > >>>>>>>>>>>>>>> handle
> > >>>>>>>>>>>>>>>>> two
> > >>>>>>>>>>>>>>>>>> exceptions: RecordTooLargeException and
> > >>>>>>>>>>>>>>>> UnknownTopicOrPartitionException.
> > >>>>>>>>>>>>>>>>>> The handler handles the exceptions per-record.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> - Do we need this handler?  [Motivation and Examples
> > >>>>>>>> sections]
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> RecordTooLargeException: 1) In transactions, the
> > >>> producer
> > >>>>>>>>>>>>> collects
> > >>>>>>>>>>>>>>>>> multiple
> > >>>>>>>>>>>>>>>>>> records in batches. Then a RecordTooLargeException
> > >>> related
> > >>>>>>>> to a
> > >>>>>>>>>>>>>>> single
> > >>>>>>>>>>>>>>>>>> record leads to failing the entire batch. A custom
> > >>>> exception
> > >>>>>>>>>>>>>> handler
> > >>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>> this case may decide on dropping the record and
> > >>> continuing
> > >>>>>>>> the
> > >>>>>>>>>>>>>>>>> processing.
> > >>>>>>>>>>>>>>>>>> See Example 1, please. 2) More over, in Kafka
> > >> Streams, a
> > >>>>>>>> record
> > >>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>> too
> > >>>>>>>>>>>>>>>>>> large is a poison pill record, and there is no way to
> > >>> skip
> > >>>>>>>> over
> > >>>>>>>>>>>>>> it. A
> > >>>>>>>>>>>>>>>>>> handler would allow us to react to this error inside
> > >> the
> > >>>>>>>>>>>>> producer,
> > >>>>>>>>>>>>>>>> i.e.,
> > >>>>>>>>>>>>>>>>>> local to where the error happens, and thus simplify
> > >> the
> > >>>>>>>> overall
> > >>>>>>>>>>>>>> code
> > >>>>>>>>>>>>>>>>>> significantly. Please read the Motivation section for
> > >>> more
> > >>>>>>>>>>>>>>> explanation.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> UnknownTopicOrPartitionException: For this case, the
> > >>>>> producer
> > >>>>>>>>>>>>>> handles
> > >>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>> exception internally and only issues a WARN log about
> > >>>>> missing
> > >>>>>>>>>>>>>>> metadata
> > >>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>> retries internally. Later, when the producer hits "
> > >>>>>>>>>>>>>>> deliver.timeout.ms"
> > >>>>>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>> throws a TimeoutException, and the user can only
> > >> blindly
> > >>>>>>>> retry,
> > >>>>>>>>>>>>>> which
> > >>>>>>>>>>>>>>>> may
> > >>>>>>>>>>>>>>>>>> result in an infinite retry loop. The thrown
> > >>>>> TimeoutException
> > >>>>>>>>>>>>>> "cuts"
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> connection to the underlying root cause of missing
> > >>>> metadata
> > >>>>>>>>>>>>> (which
> > >>>>>>>>>>>>>>>> could
> > >>>>>>>>>>>>>>>>>> indeed be a transient error but is persistent for a
> > >>>>>>>>> non-existing
> > >>>>>>>>>>>>>>>> topic).
> > >>>>>>>>>>>>>>>>>> Thus, there is no programmatic way to break the
> > >> infinite
> > >>>>>>>> retry
> > >>>>>>>>>>>>>> loop.
> > >>>>>>>>>>>>>>>>> Kafka
> > >>>>>>>>>>>>>>>>>> Streams also blindly retries for this case, and the
> > >>>>>>>> application
> > >>>>>>>>>>>>>> gets
> > >>>>>>>>>>>>>>>>> stuck.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> - Having interface vs configuration option:
> > >> [Motivation,
> > >>>>>>>>>>>>> Examples,
> > >>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>> Rejected Alternatives sections]
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Our solution is introducing an interface due to the
> > >> full
> > >>>>>>>>>>>>>> flexibility
> > >>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>> it offers. Sometimes users, especially Kafka Streams
> > >>> ones,
> > >>>>>>>>>>>>>> determine
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> handler's behaviour based on the situation. For
> > >>> example, f
> > >>>>>>>>>>>>>>>>>> acing UnknownTopicOrPartitionException*, *the user may
> > >>>> want
> > >>>>>>>> to
> > >>>>>>>>>>>>>> raise
> > >>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>> error for some topics but retry it for other topics.
> > >>>> Having
> > >>>>> a
> > >>>>>>>>>>>>>>>>> configuration
> > >>>>>>>>>>>>>>>>>> option with a fixed set of possibilities does not
> > >> serve
> > >>>> the
> > >>>>>>>>>>>>> user's
> > >>>>>>>>>>>>>>>>>> needs. See Example 2, please.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> - Note on RecordTooLargeException: [Public Interfaces
> > >>>>>>>> section]
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> If the custom handler decides on SWALLOW for
> > >>>>>>>>>>>>>> RecordTooLargeException,
> > >>>>>>>>>>>>>>>>> then
> > >>>>>>>>>>>>>>>>>> this record will not be a part of the batch of
> > >>>> transactions
> > >>>>>>>> and
> > >>>>>>>>>>>>>> will
> > >>>>>>>>>>>>>>>> also
> > >>>>>>>>>>>>>>>>>> not be sent to the broker in non-transactional mode.
> > >> So
> > >>> no
> > >>>>>>>>>>>>> worries
> > >>>>>>>>>>>>>>>> about
> > >>>>>>>>>>>>>>>>>> getting a RecordTooLargeException from the broker in
> > >>> this
> > >>>>>>>> case,
> > >>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> record will never ever be sent to the broker. SWALLOW
> > >>>> means
> > >>>>>>>>> drop
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> record
> > >>>>>>>>>>>>>>>>>> and continue/swallow the error.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> - What if the handle() method implements RETRY for
> > >>>>>>>>>>>>>>>>> RecordTooLargeException?
> > >>>>>>>>>>>>>>>>>> [Proposed Changes section]
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> We have to limit the user to only have FAIL or SWALLOW
> > >>> for
> > >>>>>>>>>>>>>>>>>> RecordTooLargeException. Actually, RETRY must be equal
> > >>> to
> > >>>>>>>> FAIL.
> > >>>>>>>>>>>>>> This
> > >>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>> well documented/informed in javadoc.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> - What if the handle() method of the handler throws an
> > >>>>>>>>> exception?
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> The handler is expected to have correct code. If it
> > >>> throws
> > >>>>> an
> > >>>>>>>>>>>>>>>> exception,
> > >>>>>>>>>>>>>>>>>> everything fails.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> This is a PoC PR <
> > >>>>> https://github.com/apache/kafka/pull/15846
> > >>>>>>>>>
> > >>>>>>>>>>>>> ONLY
> > >>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>> RecordTooLargeException. The code changes related to
> > >>>>>>>>>>>>>>>>>> UnknownTopicOrPartitionException will be added to this
> > >>> PR
> > >>>>>>>>> LATER.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Looking forward to your feedback again.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> On Thu, Apr 25, 2024 at 11:46 PM Kirk True <
> > >>>>>>>> k...@kirktrue.pro>
> > >>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Thanks for the updates!
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Comments inline...
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> On Apr 25, 2024, at 1:10 PM, Alieh Saeedi
> > >>>>>>>>>>>>>>>>> <asae...@confluent.io.INVALID
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Thanks a lot for the constructive feedbacks!
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Addressing some of the main concerns:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> - The `RecordTooLargeException` can be thrown by
> > >>> broker,
> > >>>>>>>>>>>>>> producer
> > >>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>> consumer. Of course, the `ProducerExceptionHandler`
> > >>>>>>>> interface
> > >>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>> introduced
> > >>>>>>>>>>>>>>>>>>>> to affect only the exceptions thrown from the
> > >>> producer.
> > >>>>>>>> This
> > >>>>>>>>>>>>>> KIP
> > >>>>>>>>>>>>>>>> very
> > >>>>>>>>>>>>>>>>>>>> specifically means to provide a possibility to
> > >> manage
> > >>>> the
> > >>>>>>>>>>>>>>>>>>>> `RecordTooLargeException` thrown from the
> > >>>> Producer.send()
> > >>>>>>>>>>>>>> method.
> > >>>>>>>>>>>>>>>>>> Please
> > >>>>>>>>>>>>>>>>>>>> see “Proposed Changes” section for more clarity. I
> > >>>>>>>>>>>>> investigated
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> issue
> > >>>>>>>>>>>>>>>>>>>> there thoroughly. I hope it can explain the concern
> > >>>> about
> > >>>>>>>> how
> > >>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>> handle
> > >>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> errors as well.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> - The problem with Callback: Methods of Callback are
> > >>>>> called
> > >>>>>>>>>>>>>> when
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> record
> > >>>>>>>>>>>>>>>>>>>> sent to the server is acknowledged, while this is
> > >> not
> > >>>> the
> > >>>>>>>>>>>>>> desired
> > >>>>>>>>>>>>>>>>> time
> > >>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>> all exceptions. We intend to handle exceptions
> > >>>> beforehand.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> I guess it makes sense to keep the expectation for
> > >> when
> > >>>>>>>>>>>>> Callback
> > >>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>> invoked as-is vs. shoehorning more into it.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> - What if the custom handler returns RETRY for
> > >>>>>>>>>>>>>>>>>>> `RecordTooLargeException`? I
> > >>>>>>>>>>>>>>>>>>>> assume changing the producer configuration at
> > >> runtime
> > >>> is
> > >>>>>>>>>>>>>>> possible.
> > >>>>>>>>>>>>>>>> If
> > >>>>>>>>>>>>>>>>>> so,
> > >>>>>>>>>>>>>>>>>>>> RETRY for a too large record is valid because maybe
> > >> in
> > >>>> the
> > >>>>>>>>>>>>> next
> > >>>>>>>>>>>>>>>> try,
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> too large record is not poisoning any more. I am not
> > >>>> 100%
> > >>>>>>>>>>>>> sure
> > >>>>>>>>>>>>>>>> about
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> technical details, though. Otherwise, we can
> > >> consider
> > >>>> the
> > >>>>>>>>>>>>> RETRY
> > >>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>> FAIL
> > >>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>> this exception. Another solution would be to
> > >> consider
> > >>> a
> > >>>>>>>>>>>>>> constant
> > >>>>>>>>>>>>>>>>> number
> > >>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>> times for RETRY which can be useful for other
> > >>> exceptions
> > >>>>> as
> > >>>>>>>>>>>>>> well.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> It’s not presently possible to change the
> > >> configuration
> > >>>> of
> > >>>>>>>> an
> > >>>>>>>>>>>>>>>> existing
> > >>>>>>>>>>>>>>>>>>> Producer at runtime. So if a record hits a
> > >>>>>>>>>>>>>> RecordTooLargeException
> > >>>>>>>>>>>>>>>>> once,
> > >>>>>>>>>>>>>>>>>> no
> > >>>>>>>>>>>>>>>>>>> amount of retrying (with the current Producer) will
> > >>>> change
> > >>>>>>>>> that
> > >>>>>>>>>>>>>>> fact.
> > >>>>>>>>>>>>>>>>> So
> > >>>>>>>>>>>>>>>>>>> I’m still a little stuck on how to handle a response
> > >> of
> > >>>>>>>> RETRY
> > >>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>> “oversized” record.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> - What if the handle() method itself throws an
> > >>>> exception?
> > >>>>> I
> > >>>>>>>>>>>>>> think
> > >>>>>>>>>>>>>>>>>>>> rationally and pragmatically, the behaviour must be
> > >>>>> exactly
> > >>>>>>>>>>>>>> like
> > >>>>>>>>>>>>>>>> when
> > >>>>>>>>>>>>>>>>>> no
> > >>>>>>>>>>>>>>>>>>>> custom handler is defined since the user actually
> > >> did
> > >>>> not
> > >>>>>>>>>>>>> have
> > >>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>> working
> > >>>>>>>>>>>>>>>>>>>> handler.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> I’m not convinced that ignoring an errant handler is
> > >>> the
> > >>>>>>>> right
> > >>>>>>>>>>>>>>>> choice.
> > >>>>>>>>>>>>>>>>> It
> > >>>>>>>>>>>>>>>>>>> then becomes a silent failure that might have
> > >>>>> repercussions,
> > >>>>>>>>>>>>>>>> depending
> > >>>>>>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>>>> the business logic. A user would have to proactively
> > >>>> trawls
> > >>>>>>>>>>>>>> through
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> logs for WARN/ERROR messages to catch it.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Throwing a hard error is pretty draconian, though…
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> - Why not use config parameters instead of an
> > >>> interface?
> > >>>>> As
> > >>>>>>>>>>>>>>>> explained
> > >>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>> the “Rejected Alternatives” section, we assume that
> > >>> the
> > >>>>>>>>>>>>> handler
> > >>>>>>>>>>>>>>>> will
> > >>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>> used for a greater number of exceptions in the
> > >> future.
> > >>>>>>>>>>>>>> Defining a
> > >>>>>>>>>>>>>>>>>>>> configuration parameter for each exception may make
> > >>> the
> > >>>>>>>>>>>>>>>>> configuration a
> > >>>>>>>>>>>>>>>>>>> bit
> > >>>>>>>>>>>>>>>>>>>> messy. Moreover, the handler offers more
> > >> flexibility.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Agreed that the logic-via-configuration approach is
> > >>> weird
> > >>>>>>>> and
> > >>>>>>>>>>>>>>>> limiting.
> > >>>>>>>>>>>>>>>>>>> Forget I ever suggested it ;)
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> I’d think additional background in the Motivation
> > >>> section
> > >>>>>>>>> would
> > >>>>>>>>>>>>>>> help
> > >>>>>>>>>>>>>>>> me
> > >>>>>>>>>>>>>>>>>>> understand how users might use this feature beyond a)
> > >>>>>>>> skipping
> > >>>>>>>>>>>>>>>>>> “oversized”
> > >>>>>>>>>>>>>>>>>>> records, and b) not retrying missing topics.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Small change:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> -ProductionExceptionHandlerResponse -> Response for
> > >>>>> brevity
> > >>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>> simplicity.
> > >>>>>>>>>>>>>>>>>>>> Could’ve been HandlerResponse too I think!
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> The name change sounds good to me.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Thanks Alieh!
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> I thank you all again for your useful
> > >>>>>>>> questions/suggestions.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> I would be happy to hear more of your concerns, as
> > >>>> stated
> > >>>>>>>> in
> > >>>>>>>>>>>>>> some
> > >>>>>>>>>>>>>>>>>>> feedback.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan
> > >>>>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Thanks Alieh for the updates.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> I'm a little concerned about the design pattern
> > >> here.
> > >>>> It
> > >>>>>>>>>>>>> seems
> > >>>>>>>>>>>>>>>> like
> > >>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>> want
> > >>>>>>>>>>>>>>>>>>>>> specific usages, but we are packaging it as a
> > >> generic
> > >>>>>>>>>>>>> handler.
> > >>>>>>>>>>>>>>>>>>>>> I think we tried to narrow down on the specific
> > >>> errors
> > >>>> we
> > >>>>>>>>>>>>> want
> > >>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>> handle,
> > >>>>>>>>>>>>>>>>>>>>> but it feels a little clunky as we have a generic
> > >>> thing
> > >>>>>>>> for
> > >>>>>>>>>>>>>> two
> > >>>>>>>>>>>>>>>>>> specific
> > >>>>>>>>>>>>>>>>>>>>> errors.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> I'm wondering if we are using the right patterns to
> > >>>> solve
> > >>>>>>>>>>>>>> these
> > >>>>>>>>>>>>>>>>>>> problems. I
> > >>>>>>>>>>>>>>>>>>>>> agree though that we will need something more than
> > >>> the
> > >>>>>>>> error
> > >>>>>>>>>>>>>>>> classes
> > >>>>>>>>>>>>>>>>>> I'm
> > >>>>>>>>>>>>>>>>>>>>> proposing if we want to have different handling be
> > >>>>>>>>>>>>>> configurable.
> > >>>>>>>>>>>>>>>>>>>>> My concern is that the open-endedness of a handler
> > >>>> means
> > >>>>>>>>>>>>> that
> > >>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>>>>> creating more problems than we are solving. It is
> > >>> still
> > >>>>>>>>>>>>>> unclear
> > >>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> me
> > >>>>>>>>>>>>>>>>>>> how
> > >>>>>>>>>>>>>>>>>>>>> we expect to handle the errors. Perhaps we could
> > >>>> include
> > >>>>>>>> an
> > >>>>>>>>>>>>>>>> example?
> > >>>>>>>>>>>>>>>>>> It
> > >>>>>>>>>>>>>>>>>>>>> seems like there is a specific use case in mind and
> > >>>> maybe
> > >>>>>>>> we
> > >>>>>>>>>>>>>> can
> >
> >
> >
>

Reply via email to