Very interesting discussion.

Seems a central point is the question about "how generic" we approach this, and some people think we need to be conservative and others think we should try to be as generic as possible.

Personally, I think following a limited scope for this KIP (by explicitly saying we only cover RecordTooLarge and UnknownTopicOrPartition) might be better. We have concrete tickets that we address, while for other exception (like authorization) we don't know if people want to handle it to begin with. Boiling the ocean might not get us too far, and being somewhat pragmatic might help to move this KIP forward. -- I also agree with Justin and Artem, that we want to be careful anyway to not allow users to break stuff too easily.

As the same time, I agree that we should setup this change in a forward looking way, and thus having a single generic handler allows us to later extend the handler more easily. This should also simplify follow up KIP that might add new error cases (I actually mentioned one more to Alieh already, but we both agreed that it might be best to exclude it from the KIP right now, to make the 3.8 deadline. Doing a follow up KIP is not the end of the world.)



@Chris:

(2) This sounds fair to me, but not sure how "bad" it actually would be? If the contract is clearly defined, it seems to be fine what the KIP proposes, and given that such a handler is an expert API, and we can provide "best practices" (cf my other comment below in [AL1]), being a little bit pragmatic sound fine to me.

Not sure if I understand Justin's argument on this question?


(3) About having a default impl or not. I am fine with adding one, even if I am not sure why Connect could not just add its own one and plug it in (and we would add corresponding configs for Connect, but not for the producer itself)? For this case, we could also do this as a follow up KIP, but happy to include it in this KIP to provide value to Connect right away (even if the value might not come right away if we miss the 3.8 deadline due to expanded KIP scope...) -- For KS, we would for sure plugin our own impl, and lock down the config such that users cannot set their own handler on the internal producer to begin with. Might be good to elaborate why the producer should have a default? We might actually want to add this to the KIP right away?

The key for a default impl would be, to not change the current behavior, and having no default seems to achieve this. For the two cases you mentioned, it's unclear to me what default value on "upper bound on retires" for UnkownTopicOrPartitionException we should set? Seems it would need to be the same as `delivery.timeout.ms`? However, if users have `delivery.timeout.ms` actually overwritten we would need to set this config somewhat "dynamic"? Is this feasible? If we hard-code 2 minutes, it might not be backward compatible. I have the impression we might introduce some undesired coupling? -- For the "record too large" case, the config seems to be boolean and setting it to `false` by default seems to provide backward compatibility.



@Artem:

[AL1] While I see the point, I would think having a different callback for every exception might not really be elegant? In the end, the handler is an very advanced feature anyway, and if it's implemented in a bad way, well, it's a user error -- we cannot protect users from everything. To me, a handler like this, is to some extend "business logic" and if a user gets business logic wrong, it's hard to protect them. -- We would of course provide best practice guidance in the JaveDocs, and explain that a handler should have explicit `if` statements for stuff it want to handle, and only a single default which return FAIL.


[AL2] Yes, but for KS we would retry at the application layer. Ie, the TX is not completed, we clean up and setup out task from scratch, to ensure the pending transaction is completed before we resume. If the TX was indeed aborted, we would retry from older offset and thus just hit the same error again and the loop begins again.


[AL2 cont.] Similar to AL1, I see such a handler to some extend as business logic. If a user puts a bad filter condition in their KS app, and drops messages, it nothing we can do about it, and this handler IMHO, has a similar purpose. This is also the line of thinking I apply to EOS, to address Justin's concern about "should we allow to drop for EOS", and my answer is "yes", because it's more business logic than actual error handling IMHO. And by default, we fail... So users opt-in to add business logic to drop records. It's an application level decision how to write the code.


[AL3] Maybe I misunderstand what you are saying, but to me, checking the size of the record upfront is exactly what the KIP proposes? No?



@Justin:

I saw the sample
code -- is it just an if statement checking for the error before the
handler is invoked? That seems a bit fragile.

What do you mean by fragile? Not sure if I see your point.




-Matthias

On 5/7/24 5:33 PM, Artem Livshits wrote:
Hi Alieh,

Thanks for the KIP.  The motivation talks about very specific cases, but
the interface is generic.

[AL1]
If the interface evolves in the future I think we could have the following
confusion:

1. A user implemented SWALLOW action for both RecordTooLargeException and
UnknownTopicOrPartitionException.  For simpicity they just return SWALLOW
from the function, because it elegantly handles all known cases.
2. The interface has evolved to support a new exception.
3. The user has upgraded their Kafka client.

Now a new kind of error gets dropped on the floor without user's intention
and it would be super hard to detect and debug.

To avoid the confusion, I think we should use handlers for specific
exceptions.  Then if a new exception is added it won't get silently swalled
because the user would need to add new functionality to handle it.

I also have some higher level comments:

[AL2]
it throws a TimeoutException, and the user can only blindly retry, which
may result in an infinite retry loop

If the TimeoutException happens during transactional processing (exactly
once is the desired sematnics), then the client should not retry when it
gets TimeoutException because without knowing the reason for
TimeoutExceptions, the client cannot know whether the message got actually
produced or not and retrying the message may result in duplicatees.

The thrown TimeoutException "cuts" the connection to the underlying root
cause of missing metadata

Maybe we should fix the error handling and return the proper underlying
message?  Then the application can properly handle the message based on
preferences.

 From the product perspective, it's not clear how safe it is to blindly
ignore UnknownTopicOrPartitionException.  This could lead to situations
when a simple typo could lead to massive data loss (part of the data would
effectively be produced to a "black hole" and the user may not notice it
for a while).

In which situations would you recommend the user to "black hole" messages
in case of misconfiguration?

[AL3]

If the custom handler decides on SWALLOW for RecordTooLargeException,

Is it my understanding that this KIP proposes that functionality that would
only be able to SWALLOW RecordTooLargeException that happen because the
producer cannot produce the record (if the broker rejects the batch, the
error won't get to the handler, because we cannot know which other records
get ignored).  In this case, why not just check the locally configured max
record size upfront and not produce the recrord in the first place?  Maybe
we can expose a validation function from the producer that could validate
the records locally, so we don't need to produce the record in order to
know that it's invalid.

-Artem

On Tue, May 7, 2024 at 2:07 PM Justine Olshan <jols...@confluent.io.invalid>
wrote:

Alieh and Chris,

Thanks for clarifying 1) but I saw the motivation. I guess I just didn't
understand how that would be ensured on the producer side. I saw the sample
code -- is it just an if statement checking for the error before the
handler is invoked? That seems a bit fragile.

Can you clarify what you mean by `since the code does not reach the KS
interface and breaks somewhere in producer.` If we surfaced this error to
the application in a better way would that also be a solution to the issue?

Justine

On Tue, May 7, 2024 at 1:55 PM Alieh Saeedi <asae...@confluent.io.invalid>
wrote:

Hi,


Thank you, Chris and Justine, for the feedback.


@Chris

1) Flexibility: it has two meanings. The first meaning is the one you
mentioned. We are going to cover more exceptions in the future, but as
Justine mentioned, we must be very conservative about adding more
exceptions. Additionally, flexibility mainly means that the user is able
to
develop their own code. As mentioned in the motivation section and the
examples, sometimes the user decides on dropping a record based on the
topic, for example.


2) Defining two separate methods for retriable and non-retriable
exceptions: although the idea is brilliant, the user may still make a
mistake by implementing the wrong method and see a non-expecting
behaviour.
For example, he may implement handleRetriable() for
RecordTooLargeException
and define SWALLOW for the exception, but in practice, he sees no change
in
default behaviour since he implemented the wrong method. I think we can
never reduce the user’s mistakes to 0.



3) Default implementation for Handler: the default behaviour is already
preserved with NO need of implementing any handler or setting the
corresponding config parameter `custom.exception.handler`. What you mean
is
actually having a second default, which requires having both interface
and
config parameters. About UnknownTopicOrPartitionException: the producer
already offers the config parameter `max.block.ms` which determines the
duration of retrying. The main purpose of the user who needs the handler
is
to get the root cause of TimeoutException and handle it in the way he
intends. The KIP explains the necessity of it for KS users.


4) Naming issue: By SWALLOW, we meant actually swallow the error, while
SKIP means skip the record, I think. If it makes sense for more ppl, I
can
change it to SKIP


@Justine

1) was addressed by Chris.

2 and 3) The problem is exactly what you mentioned. Currently, there is
no
way to handle these issues application-side. Even KS users who implement
KS
ProductionExceptionHandler are not able to handle the exceptions as they
intend since the code does not reach the KS interface and breaks
somewhere
in producer.

Cheers,
Alieh

On Tue, May 7, 2024 at 8:43 PM Chris Egerton <fearthecel...@gmail.com>
wrote:

Hi Justine,

The method signatures for the interface are indeed open-ended, but the
KIP
states that its uses will be limited. See the motivation section:

We believe that the user should be able to develop custom exception
handlers for managing producer exceptions. On the other hand, this will
be
an expert-level API, and using that may result in strange behaviour in
the
system, making it hard to find the root cause. Therefore, the custom
handler is currently limited to handling RecordTooLargeException and
UnknownTopicOrPartitionException.

Cheers,

Chris


On Tue, May 7, 2024, 14:37 Justine Olshan <jols...@confluent.io.invalid

wrote:

Hi Alieh,

I was out for KSB and then was also sick. :(

To your point 1) Chris, I don't think it is limited to two specific
scenarios, since the interface accepts a generic Exception e and can
be
implemented to check if that e is an instanceof any exception. I
didn't
see
anywhere that specific errors are enforced. I'm a bit concerned about
this
actually. I'm concerned about the opened-endedness and the contract
we
have
with transactions. We are allowing the client to make decisions that
are
somewhat invisible to the server. As an aside, can we build in log
messages
when the handler decides to skip etc a message. I'm really concerned
about
messages being silently dropped.

I do think Chris's point 2) about retriable vs non retriable errors
is
fair. I'm a bit concerned about skipping a unknown topic or partition
exception too early, as there are cases where it can be transient.

I'm still a little bit wary of allowing dropping records as part of
EOS
generally as in many cases, these errors signify an issue with the
original
data. I understand that streams and connect/mirror maker may have
reasons
they want to progress past these messages, but wondering if there is
a
way
that can be done application-side. I'm willing to accept this sort of
proposal if we can make it clear that this sort of thing is happening
and
we limit the blast radius for what we can do.

Justine

On Tue, May 7, 2024 at 9:55 AM Chris Egerton <chr...@aiven.io.invalid

wrote:

Hi Alieh,

Sorry for the delay, I've been out sick. I still have some thoughts
that
I'd like to see addressed before voting.

1) If flexibility is the motivation for a pluggable interface, why
are
we
only limiting the uses for this interface to two very specific
scenarios?
Why not also allow, e.g., authorization errors to be handled as
well
(allowing users to drop records destined for some off-limits
topics,
or
retry for a limited duration in case there's a delay in the
propagation
of
ACL updates)? It'd be nice to see some analysis of other errors
that
could
be handled with this new API, both to avoid the follow-up work of
another
KIP to address them in the future, and to make sure that we're not
painting
ourselves into a corner with the current API in a way that would
make
future modifications difficult.

2) Something feels a bit off with how retriable vs. non-retriable
errors
are handled with the interface. Why not introduce two separate
methods
to
handle each case separately? That way there's no ambiguity or
implicit
behavior when, e.g., attempting to retry on a
RecordTooLargeException.
This
could be something like `NonRetriableResponse
handle(ProducerRecord,
Exception)` and `RetriableResponse handleRetriable(ProducerRecord,
Exception)`, though the exact names and shape can obviously be
toyed
with a
bit.

3) Although the flexibility of a pluggable interface may benefit
some
users' custom producer applications and Kafka Streams applications,
it
comes at significant deployment cost for other low-/no-code
environments,
including but not limited to Kafka Connect and MirrorMaker 2. Can
we
add
a
default implementation of the exception handler that allows for
some
simple
behavior to be tweaked via configuration property? Two things that
would
be
nice to have would be A) an upper bound on the retry time for
unknown-topic-partition exceptions and B) an option to drop records
that
are large enough to trigger a record-too-large exception.

4) I'd still prefer to see "SKIP" or "DROP" instead of the proposed
"SWALLOW" option, which IMO is opaque and non-obvious, especially
when
trying to guess the behavior for retriable errors.

Cheers,

Chris

On Fri, May 3, 2024 at 11:23 AM Alieh Saeedi
<asae...@confluent.io.invalid

wrote:

Hi all,


A summary of the KIP and the discussions:


The KIP introduces a handler interface for Producer in order to
handle
two
exceptions: RecordTooLargeException and
UnknownTopicOrPartitionException.
The handler handles the exceptions per-record.


- Do we need this handler?  [Motivation and Examples sections]


RecordTooLargeException: 1) In transactions, the producer
collects
multiple
records in batches. Then a RecordTooLargeException related to a
single
record leads to failing the entire batch. A custom exception
handler
in
this case may decide on dropping the record and continuing the
processing.
See Example 1, please. 2) More over, in Kafka Streams, a record
that
is
too
large is a poison pill record, and there is no way to skip over
it. A
handler would allow us to react to this error inside the
producer,
i.e.,
local to where the error happens, and thus simplify the overall
code
significantly. Please read the Motivation section for more
explanation.


UnknownTopicOrPartitionException: For this case, the producer
handles
this
exception internally and only issues a WARN log about missing
metadata
and
retries internally. Later, when the producer hits "
deliver.timeout.ms"
it
throws a TimeoutException, and the user can only blindly retry,
which
may
result in an infinite retry loop. The thrown TimeoutException
"cuts"
the
connection to the underlying root cause of missing metadata
(which
could
indeed be a transient error but is persistent for a non-existing
topic).
Thus, there is no programmatic way to break the infinite retry
loop.
Kafka
Streams also blindly retries for this case, and the application
gets
stuck.



- Having interface vs configuration option: [Motivation,
Examples,
and
Rejected Alternatives sections]

Our solution is introducing an interface due to the full
flexibility
that
it offers. Sometimes users, especially Kafka Streams ones,
determine
the
handler's behaviour based on the situation. For example, f
acing UnknownTopicOrPartitionException*, *the user may want to
raise
an
error for some topics but retry it for other topics. Having a
configuration
option with a fixed set of possibilities does not serve the
user's
needs. See Example 2, please.


- Note on RecordTooLargeException: [Public Interfaces section]

If the custom handler decides on SWALLOW for
RecordTooLargeException,
then
this record will not be a part of the batch of transactions and
will
also
not be sent to the broker in non-transactional mode. So no
worries
about
getting a RecordTooLargeException from the broker in this case,
as
the
record will never ever be sent to the broker. SWALLOW means drop
the
record
and continue/swallow the error.


- What if the handle() method implements RETRY for
RecordTooLargeException?
[Proposed Changes section]

We have to limit the user to only have FAIL or SWALLOW for
RecordTooLargeException. Actually, RETRY must be equal to FAIL.
This
is
well documented/informed in javadoc.



- What if the handle() method of the handler throws an exception?

The handler is expected to have correct code. If it throws an
exception,
everything fails.



This is a PoC PR <https://github.com/apache/kafka/pull/15846>
ONLY
for
RecordTooLargeException. The code changes related to
UnknownTopicOrPartitionException will be added to this PR LATER.


Looking forward to your feedback again.


Cheers,

Alieh

On Thu, Apr 25, 2024 at 11:46 PM Kirk True <k...@kirktrue.pro>
wrote:

Hi Alieh,

Thanks for the updates!

Comments inline...


On Apr 25, 2024, at 1:10 PM, Alieh Saeedi
<asae...@confluent.io.INVALID

wrote:

Hi all,

Thanks a lot for the constructive feedbacks!



Addressing some of the main concerns:


- The `RecordTooLargeException` can be thrown by broker,
producer
and
consumer. Of course, the `ProducerExceptionHandler` interface
is
introduced
to affect only the exceptions thrown from the producer. This
KIP
very
specifically means to provide a possibility to manage the
`RecordTooLargeException` thrown from the Producer.send()
method.
Please
see “Proposed Changes” section for more clarity. I
investigated
the
issue
there thoroughly. I hope it can explain the concern about how
we
handle
the
errors as well.



- The problem with Callback: Methods of Callback are called
when
the
record
sent to the server is acknowledged, while this is not the
desired
time
for
all exceptions. We intend to handle exceptions beforehand.

I guess it makes sense to keep the expectation for when
Callback
is
invoked as-is vs. shoehorning more into it.

- What if the custom handler returns RETRY for
`RecordTooLargeException`? I
assume changing the producer configuration at runtime is
possible.
If
so,
RETRY for a too large record is valid because maybe in the
next
try,
the
too large record is not poisoning any more. I am not 100%
sure
about
the
technical details, though. Otherwise, we can consider the
RETRY
as
FAIL
for
this exception. Another solution would be to consider a
constant
number
of
times for RETRY which can be useful for other exceptions as
well.

It’s not presently possible to change the configuration of an
existing
Producer at runtime. So if a record hits a
RecordTooLargeException
once,
no
amount of retrying (with the current Producer) will change that
fact.
So
I’m still a little stuck on how to handle a response of RETRY
for
an
“oversized” record.

- What if the handle() method itself throws an exception? I
think
rationally and pragmatically, the behaviour must be exactly
like
when
no
custom handler is defined since the user actually did not
have
a
working
handler.

I’m not convinced that ignoring an errant handler is the right
choice.
It
then becomes a silent failure that might have repercussions,
depending
on
the business logic. A user would have to proactively trawls
through
the
logs for WARN/ERROR messages to catch it.

Throwing a hard error is pretty draconian, though…

- Why not use config parameters instead of an interface? As
explained
in
the “Rejected Alternatives” section, we assume that the
handler
will
be
used for a greater number of exceptions in the future.
Defining a
configuration parameter for each exception may make the
configuration a
bit
messy. Moreover, the handler offers more flexibility.

Agreed that the logic-via-configuration approach is weird and
limiting.
Forget I ever suggested it ;)

I’d think additional background in the Motivation section would
help
me
understand how users might use this feature beyond a) skipping
“oversized”
records, and b) not retrying missing topics.

Small change:

-ProductionExceptionHandlerResponse -> Response for brevity
and
simplicity.
Could’ve been HandlerResponse too I think!

The name change sounds good to me.

Thanks Alieh!



I thank you all again for your useful questions/suggestions.

I would be happy to hear more of your concerns, as stated in
some
feedback.

Cheers,
Alieh

On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan
<jols...@confluent.io.invalid> wrote:

Thanks Alieh for the updates.

I'm a little concerned about the design pattern here. It
seems
like
we
want
specific usages, but we are packaging it as a generic
handler.
I think we tried to narrow down on the specific errors we
want
to
handle,
but it feels a little clunky as we have a generic thing for
two
specific
errors.

I'm wondering if we are using the right patterns to solve
these
problems. I
agree though that we will need something more than the error
classes
I'm
proposing if we want to have different handling be
configurable.
My concern is that the open-endedness of a handler means
that
we
are
creating more problems than we are solving. It is still
unclear
to
me
how
we expect to handle the errors. Perhaps we could include an
example?
It
seems like there is a specific use case in mind and maybe we
can
make
a
design that is tighter and supports that case.

Justine

On Tue, Apr 23, 2024 at 3:06 PM Kirk True <
k...@kirktrue.pro>
wrote:

Hi Alieh,

Thanks for the KIP!

A few questions:

K1. What is the expected behavior for the producer if it
generates
a
RecordTooLargeException, but the handler returns RETRY?
K2. How do we determine which Record was responsible for
the
UnknownTopicOrPartitionException since we get that response
when
sending  a
batch of records?
K3. What is the expected behavior if the handle() method
itself
throws
an
error?
K4. What is the downside of adding an onError() method to
the
Producer’s
Callback interface vs. a new mechanism?
K5. Can we change “ProducerExceptionHandlerResponse" to
just
“Response”
given that it’s an inner enum?
K6. Any recommendation for callback authors to handle
different
behavior
for different topics?

I’ll echo what others have said, it would help me
understand
why
we
want
another handler class if there were more examples in the
Motivation
section. As it stands now, I agree with Chris that the
stated
issues
could
be solved by adding two new configuration options:

    oversized.record.behavior=fail
    retry.on.unknown.topic.or.partition=true

What I’m not yet able to wrap my head around is: what
exactly
would
the
logic in the handler be? I’m not very imaginative, so I’m
assuming
they’d
mostly be if-this-then-that. However, if they’re more
complicated,
I’d
have
other concerns.

Thanks,
Kirk

On Apr 22, 2024, at 7:38 AM, Alieh Saeedi
<asae...@confluent.io.INVALID

wrote:

Thank you all for the feedback!

Addressing the main concern: The KIP is about giving the
user
the
ability
to handle producer exceptions, but to be more conservative
and
avoid
future
issues, we decided to be limited to a short list of
exceptions.
I
included
*RecordTooLargeExceptin* and
*UnknownTopicOrPartitionException.
*Open
to
suggestion for adding some more ;-)

KIP Updates:
- clarified the way that the user should configure the
Producer
to
use
the
custom handler. I think adding a producer config property
is
the
cleanest
one.
- changed the *ClientExceptionHandler* to
*ProducerExceptionHandler*
to
be
closer to what we are changing.
- added the ProducerRecord as the input parameter of the
handle()
method
as
well.
- increased the response types to 3 to have fail and two
types
of
continue.
- The default behaviour is having no custom handler,
having
the
corresponding config parameter set to null. Therefore, the
KIP
provides
no
default implementation of the interface.
- We follow the interface solution as described in the
Rejected Alternetives section.


Cheers,
Alieh


On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax <
mj...@apache.org

wrote:

Thanks for the KIP Alieh! It addresses an important case
for
error
handling.

I agree that using this handler would be an expert API,
as
mentioned
by
a few people. But I don't think it would be a reason to
not
add
it.
It's
always a tricky tradeoff what to expose to users and to
avoid
foot
guns,
but we added similar handlers to Kafka Streams, and have
good
experience
with it. Hence, I understand, but don't share the concern
raised.

I also agree that there is some responsibility by the
user
to
understand
how such a handler should be implemented to not drop data
by
accident.
But it seem unavoidable and acceptable.

While I understand that a "simpler / reduced" API (eg via
configs)
might
also work, I personally prefer a full handler. Configs
have
the
same
issue that they could be miss-used potentially leading to
incorrectly
dropped data, but at the same time are less flexible (and
thus
maybe
ever harder to use correctly...?). Base on my experience,
there
is
also
often weird corner case for which it make sense to also
drop
records
for
other exceptions, and a full handler has the advantage of
full
flexibility and "absolute power!".

To be fair: I don't know the exact code paths of the
producer
in
details, so please keep me honest. But my understanding
is,
that
the
KIP
aims to allow users to react to internal exception, and
decide
to
keep
retrying internally, swallow the error and drop the
record,
or
raise
the
error?

Maybe the KIP would need to be a little bit more precises
what
error
we
want to cover -- I don't think this list must be
exhaustive,
as
we
can
always do follow up KIP to also apply the handler to
other
errors
to
expand the scope of the handler. The KIP does mention
examples,
but
it
might be good to explicitly state for what cases the
handler
gets
applied?

I am also not sure if CONTINUE and FAIL are enough
options?
Don't
we
need three options? Or would `CONTINUE` have different
meaning
depending
on the type of error? Ie, for a retryable error
`CONTINUE`
would
mean
keep retrying internally, but for a non-retryable error
`CONTINUE`
means
swallow the error and drop the record? This semantic
overload
seems
tricky to reason about by users, so it might better to
split
`CONTINUE`
into two cases -> `RETRY` and `SWALLOW` (or some better
names).

Additionally, should we just ship a
`DefaultClientExceptionHandler`
which would return `FAIL`, for backward compatibility. Or
don't
have
any
default handler to begin with and allow it to be `null`?
I
don't
see
the
need for a specific `TransactionExceptionHandler`. To me,
the
goal
should be to not modify the default behavior at all, but
to
just
allow
users to change the default behavior if there is a need.

What is missing on the KIP though it, how the handler is
passed
into
the
producer thought? Would we need a new config which allows
to
set
a
custom handler? And/or would we allow to pass in an
instance
via
the
constructor or add a new method to set a handler?


-Matthias

On 4/18/24 10:02 AM, Andrew Schofield wrote:
Hi Alieh,
Thanks for the KIP.

Exception handling in the Kafka producer and consumer is
really
not
ideal.
It’s even harder working out what’s going on with the
consumer.

I’m a bit nervous about this KIP and I agree with Chris
that
it
could
do
with additional
motivation. This would be an expert-level interface
given
how
complicated
the exception handling for Kafka has become.

7. The application is not really aware of the batching
being
done
on
its
behalf.
The ProduceResponse can actually return an array of
records
which
failed
per batch. If you get RecordTooLargeException, and want
to
retry,
you
probably
need to remove the offending records from the batch and
retry
it.
This
is getting fiddly.

8. There is already o.a.k.clients.producer.Callback. I
wonder
whether
an
alternative might be to add a method to the existing
Callback
interface,
such as:

  ClientExceptionResponse onException(Exception
exception)

It would be called when a ProduceResponse contains an
error,
but
the
producer is going to retry. It tells the producer
whether
to
go
ahead
with the retry
or not. The default implementation would be to CONTINUE,
because
that’s
just continuing to retry as planned. Note that this is a
per-record
callback, so
the application would be able to understand which
records
failed.

By using an existing interface, we already know how to
configure
it
and
we know
about the threading model for calling it.


Thanks,
Andrew



On 17 Apr 2024, at 18:17, Chris Egerton
<chr...@aiven.io.INVALID

wrote:

Hi Alieh,

Thanks for the KIP! The issue with writing to
non-existent
topics
is
particularly frustrating for users of Kafka Connect and
has
been
the
source
of a handful of Jira tickets over the years. My
thoughts:

1. An additional detail we can add to the motivation
(or
possibly
rejected
alternatives) section is that this kind of custom retry
logic
can't
be
implemented by hand by, e.g., setting retries to 0 in
the
producer
config
and handling exceptions at the application level. Or
rather,
it
can,
but 1)
it's a bit painful to have to reimplement at every
call-site
for
Producer::send (and any code that awaits the returned
Future)
and
2)
it's
impossible to do this without losing idempotency on
retries.

2. That said, I wonder if a pluggable interface is
really
the
right
call
here. Documenting the interactions of a producer with
a ClientExceptionHandler instance will be tricky, and
implementing
them
will also be a fair amount of work. I believe that
there
needs
to
be
some
more granularity for how writes to non-existent topics
(or
really,
UNKNOWN_TOPIC_OR_PARTITION and related errors from the
broker)
are
handled,
but I'm torn between keeping it simple with maybe one
or
two
new
producer
config properties, or a full-blown pluggable interface.
If
there
are
more
cases that would benefit from a pluggable interface, it
would
be
nice
to
identify these and add them to the KIP to strengthen
the
motivation.
Right
now, I'm not sure the two that are mentioned in the
motivation
are
sufficient.

3. Alternatively, a possible compromise is for this KIP
to
introduce
new
properties that dictate how to handle
unknown-topic-partition
and
record-too-large errors, with the thinking that if we
introduce a
pluggable
interface later on, these properties will be recognized
by
the
default
implementation of that interface but could be
completely
ignored
or
replaced by alternative implementations.

4. (Nit) You can remove the "This page is meant as a
template
for
writing a
KIP..." part from the KIP. It's not a template anymore
:)

5. If we do go the pluggable interface route, wouldn't
we
want
to
add
the
possibility for retry logic? The simplest version of
this
could
be
to
add a
RETRY value to the ClientExceptionHandlerResponse enum.

6. I think "SKIP" or "DROP" might be clearer instead of
"CONTINUE"
for
the ClientExceptionHandlerResponse enum, since they
cause
records
to
be
dropped.

Cheers,

Chris

On Wed, Apr 17, 2024 at 12:25 PM Justine Olshan
<jols...@confluent.io.invalid> wrote:

Hey Alieh,

I echo what Omnia says, I'm not sure I understand the
implications
of
the
change and I think more detail is needed.

This comment also confused me a bit:
* {@code ClientExceptionHandler} that continues the
transaction
even
if a
record is too large.
* Otherwise, it makes the transaction to fail.

Relatedly, I've been working with some folks on a KIP
for
transactions
errors and how they are handled. Specifically for the
RecordTooLargeException (and a few other errors), we
want
to
give
a
new
error category for this error that allows the
application
to
choose
how it
is handled. Maybe this KIP is something that you are
looking
for?
Stay
tuned :)

Justine





On Wed, Apr 17, 2024 at 8:03 AM Omnia Ibrahim <
o.g.h.ibra...@gmail.com

wrote:

Hi Alieh,
Thanks for the KIP! I have couple of comments
- You mentioned in the KIP motivation,
Another example for which a production exception
handler
could
be
useful
is if a user tries to write into a non-existing
topic,
which
returns a
retryable error code; with infinite retries, the
producer
would
hang
retrying forever. A handler could help to break the
infinite
retry
loop.

How the handler can differentiate between something
that
is
temporary
and
it should keep retrying and something permanent like
forgot
to
create
the
topic? temporary here could be
the producer get deployed before the topic creation
finish
(specially
if
the topic creation is handled via IaC)
temporary offline partitions
leadership changing
       Isn’t this putting the producer at risk of
dropping
records
unintentionally?
- Can you elaborate more on what is written in the
compatibility
/
migration plan section please by explaining in bit
more
details
what
is
the
changing behaviour and how this will impact client
who
are
upgrading?
- In the proposal changes can you elaborate in the
KIP
where
in
the
producer lifecycle will ClientExceptionHandler and
TransactionExceptionHandler get triggered, and how
will
the
producer
configure them to point to costumed implementation.

Thanks
Omnia

On 17 Apr 2024, at 13:13, Alieh Saeedi
<asae...@confluent.io.INVALID

wrote:

Hi all,
Here is the KIP-1038: Add Custom Error Handler to
Producer.
<











https://cwiki.apache.org/confluence/display/KAFKA/KIP-1038%3A+Add+Custom+Error+Handler+to+Producer

I look forward to your feedback!

Cheers,
Alieh


















Reply via email to