Hey,

seems this KIP is very difficult, and we actually had a lot of background discussion about it in the last weeks. I believe the problem with this KIP is, that we have 3 starting points to look at a problem, but these 3 starting points don't align:


1. Producer API: we want a clean API design or developers using the producer directly (clean semantics, not footguns...)

2. Kafka Streams: as a power user of the producer, we want to have advanced capabilities; given how KS works internally, we need a "power API" on he producer

3. Kafka Connect: also a power user of the producer. However, Connect is a framework not a programming API and thus prefers a config based approach


I also think we got one idea wrong: let the user code / handler take care of retries. (I guess that's on me, I started with the idea to have a third return code RETRY...) -- the handler has not enough context information, and making this information available leads to a very clumsy interface. (Defeats (1) from above.)

I believe, if we would move forward with the handler, we would need to let the producer do retries, and only call the handler after all retries/timeout are exhausted. However, for this to work, we need a producer config for Connect, what basically defeats the purpose for (2) to make it a programmatic solution (it seems somewhat redundant)

Also, the idea to make the handler configurable, in hindsight, seems like a poor approach / bad compromise to address (3) w/o sacrificing (2), but is a problem for (1).


We also discussed the "missing metadata" case, and actually believe we can address it w/o a public API change. Alieh put up a PR for this already: https://github.com/apache/kafka/pull/16344


This leaves us with the "producer error state problem for EOS" but it might be better to solve this differently. Alieh started KIP-1059 for this case now.


Thus, it seems we should DISCARD this KIP, and the Connect team can do a follow up KIP to add the producer configs they need for their own situation.

Splitting the solutions tailored for the different situations seems to lead to an overall cleaner solution to the problem.

Thoughts?


-Matthias



On 5/15/24 12:30 AM, Federico Valeri wrote:
Hello Alieh, thanks for this useful KIP.

There is a typo in the motivation when you talk about the
UnknownTopicOrPartitionException. It's delivery.timeout.ms, not
deliver.timeout.ms.

In the past, I did some work to improve and clean the official Kafka
examples, which I think are useful for new Kafka users. I was
wondering if it is worth to improve them in order to show the correct
usage of this new interface. If you agree, maybe we could mention this
in the proposed changes.

The accepted responses for RecordTooLargeException are FAIL and SWALLOW. 
Therefore, RETRY will be interpreted and executed as FAIL.

Why do we need this javadoc note? I think it's not possible to return
RETRY in the current form.

When we talk about swallowing in the default implementation, I think
we will log an error/warning and drop the record right? If yes, should
we clarify this and improve the DROP_INVALID_LARGE_RECORDS_DOC by
mentioning the logging part?

Should we mention somewhere which logic takes precedence when both the
interface and configs are used?

On Tue, May 14, 2024 at 4:45 PM Chris Egerton <chr...@aiven.io.invalid> wrote:

Hi Alieh,

Thank you for all the updates! One final question--how will the retry
timeout for unknown topic partition errors be implemented? I think it would
be best if this could be done with an implementation of the error handler,
but I don't see a way to track the necessary information with the
current ProducerExceptionHandler interface.

Cheers,

Chris

On Tue, May 14, 2024 at 9:10 AM Alieh Saeedi <asae...@confluent.io.invalid>
wrote:

Thanks Andrew. Done :)

@Chris: I changed the config parameter type from boolean to integer, which
defines the timeout for retrying. I thought reusing `max.block.ms` was not
reasonable as you mentioned.

So if the KIP looks good, let 's skip to the good part ;-) VOTING :)

Bests,
Alieh





On Tue, May 14, 2024 at 12:26 PM Andrew Schofield <
andrew_schofi...@live.com>
wrote:

Hi Alieh,
Just one final comment.

[AJS5] Existing classes use Retriable, not Retryable. For example:


https://kafka.apache.org/21/javadoc/org/apache/kafka/common/errors/RetriableException.html

I suggest RetriableResponse and NonRetriableResponse.

Thanks,
Andrew

On 13 May 2024, at 23:17, Alieh Saeedi <asae...@confluent.io.INVALID>
wrote:

Hi all,


Thanks for all the valid points you listed.


KIP updates and addressing concerns:


1) The KIP now suggests two Response types: `RetryableResponse` and
`NonRetryableResponse`


2) `custom.exception.handler` is changed to
`custom.exception.handler.class`


3) The KIP clarifies that `In the case of an implemented handler for
the
specified exception, the handler takes precedence.`


4)  There is now a `default` implementation for both handle() methods.


5)  @Chris: for `UnknownTopicOrPartition`, the default is already
retrying
for 60s. (In fact, the default value of `max.block.ms`). If the
handler
instructs to FAIL or SWALLOW, there will be no retry, and if the
handler
instructs to RETRY, that will be the default behavior, which follows
the
values in already existing config parameters such as `max.block.ms`.
Does
that make sense?


Hope the changes and explanations are convincing :)


Cheers,

Alieh

On Mon, May 13, 2024 at 6:40 PM Justine Olshan
<jols...@confluent.io.invalid>
wrote:

Oh I see. The type isn't the error type but a newly defined type for
the
response. Makes sense and works for me.

Justine

On Mon, May 13, 2024 at 9:13 AM Chris Egerton <
fearthecel...@gmail.com>
wrote:

If we have dedicated methods for each kind of exception
(handleRecordTooLarge, handleUnknownTopicOrPartition, etc.), doesn't
that
provide sufficient constraint? I'm not suggesting we eliminate these
methods, just that we change their return types to something more
flexible.

On Mon, May 13, 2024, 12:07 Justine Olshan
<jols...@confluent.io.invalid

wrote:

I'm not sure I agree with the Retriable and NonRetriableResponse
comment.
This doesn't limit the blast radius or enforce certain errors are
used.
I think we might disagree on how controlled these interfaces can
be...

Justine

On Mon, May 13, 2024 at 8:40 AM Chris Egerton
<chr...@aiven.io.invalid

wrote:

Hi Alieh,

Thanks for the updates! I just have a few more thoughts:

- I don't think a boolean property is sufficient to dictate retries
for
unknown topic partitions, though. These errors can occur if a topic
has
just been created, which can occur if, for example, automatic topic
creation is enabled for a multi-task connector. This is why I
proposed
a
timeout instead of a boolean (and see my previous email for why
reducing
max.block.ms for a producer is not a viable alternative). If it
helps,
one
way to reproduce this yourself is to add the line
`fooProps.put(TASKS_MAX_CONFIG, "10");` to the integration test
here:






https://github.com/apache/kafka/blob/5439914c32fa00d634efa7219699f1bc21add839/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java#L134
and then check the logs afterward for messages like "Error while
fetching
metadata with correlation id <n> :
{foo-topic=UNKNOWN_TOPIC_OR_PARTITION}".

- I also don't think we need custom XxxResponse enums for every
possible
method; it seems like this will lead to a lot of duplication and
cognitive
overhead if we want to expand the error handler in the future.
Something
more flexible like RetriableResponse and NonRetriableResponse could
suffice.

- Finally, the KIP still doesn't state how the handler will or
won't
take
precedence over existing retry properties. If I set `retries` or `
delivery.timeout.ms` or `max.block.ms` to low values, will that
cause
retries to cease even if my custom handler would otherwise keep
returning
RETRY for an error?

Cheers,

Chris

On Mon, May 13, 2024 at 11:02 AM Andrew Schofield <
andrew_schofi...@live.com>
wrote:

Hi Alieh,
Just a few more comments on the KIP. It is looking much less risky
now
the
scope
is tighter.

[AJS1] It would be nice to have default implementations of the
handle
methods
so an implementor would not need to implement both themselves.

[AJS2] Producer configurations which are class names usually end
in
“.class”.
I suggest “custom.exception.handler.class”.

[AJS3] If I implemented a handler, and I set a non-default value
for
one
of the
new configuations, what happens? I would expect that the handler
takes
precedence. I wasn’t quite clear what “the control will follow the
handler
instructions” meant.

[AJS4] Because you now have an enum for the
RecordTooLargeExceptionResponse,
I don’t think you need to state in the comment for
ProducerExceptionHandler that
RETRY will be interpreted as FAIL.

Thanks,
Andrew

On 13 May 2024, at 14:53, Alieh Saeedi
<asae...@confluent.io.INVALID

wrote:

Hi all,


Thanks for the very interesting discussion during my PTO.


KIP updates and addressing concerns:


1) Two handle() methods are defined in ProducerExceptionHandler
for
the
two
exceptions with different input parameters so that we have
handle(RecordTooLargeException e, ProducerRecord record) and
handle(UnknownTopicOrPartitionException e, ProducerRecord record)


2) The ProducerExceptionHandler extends `Closable` as well.


3) The KIP suggests having two more configuration parameters with
boolean
values:

- `drop.invalid.large.records` with a default value of `false`
for
swallowing too large records.

- `retry.unknown.topic.partition` with a default value of `true`
that
performs RETRY for `max.block.ms` ms, encountering the
UnknownTopicOrPartitionException.


Hope the main concerns are addressed so that we can go forward
with
voting.


Cheers,

Alieh

On Thu, May 9, 2024 at 11:25 PM Artem Livshits
<alivsh...@confluent.io.invalid> wrote:

Hi Mathias,

[AL1] While I see the point, I would think having a different
callback
for every exception might not really be elegant?

I'm not sure how to assess the level of elegance of the
proposal,
but
I
can
comment on the technical characteristics:

1. Having specific interfaces that codify the logic that is
currently
prescribed in the comments reduce the chance of making a
mistake.
Commments may get ignored, misuderstood or etc. but if the
contract
is
codified, the compilier will help to enforce the contract.
2. Given that the logic is trickier than it seems (the
record-too-large
is
an example that can easily confuse someone who's not intimately
familiar
with the nuances of the batching logic), having a little more
hoops
to
jump
would give a greater chance that whoever tries to add a new
cases
pauses
and thinks a bit more.
3. As Justine pointed out, having different method will be a
forcing
function to go through a KIP rather than smuggle new cases
through
implementation.
4. Sort of a consequence of the previous 3 -- all those things
reduce
the
chance of someone writing the code that works with 2 errors and
then
when
more errors are added in the future will suddenly incorrectly
ignore
new
errors (the example I gave in the previous email).

[AL2 cont.] Similar to AL1, I see such a handler to some extend
as
business logic. If a user puts a bad filter condition in their
KS
app,
and
drops messages

I agree that there is always a chance to get a bug and lose
messages,
but
there are generally separation of concerns that has different
risk
profile:
the filtering logic may be more rigorously tested and rarely
changed
(say
an application developer does it), but setting the topics to
produce
may be
done via configuration (e.g. a user of the application does it)
and
it's
generally an expectation that users would get an error when
configuration
is incorrect.

What could be worse is that UnknownTopicOrPartitionException can
be
an
intermittent error, i.e. with a generally correct configuration,
there
could be metadata propagation problem on the cluster and then a
random
set
of records could get lost.

[AL3] Maybe I misunderstand what you are saying, but to me,
checking
the
size of the record upfront is exactly what the KIP proposes? No?

It achieves the same result but solves it differently, my
proposal:

1. Application checks the validity of a record (maybe via a new
validateRecord method) before producing it, and can just exclude
it
or
return an error to the user.
2. Application produces the record -- at this point there are no
records
that could return record too large, they were either skipped at
step 1
or
we didn't get here because step 1 failed.

Vs. KIP's proposal

1. Application produces the record.
2. Application gets a callback.
3. Application returns the action on how to proceed.

The advantage of the former is the clarity of semantics -- the
record
is
invalid (property of the record, not a function of server state
or
server
configuration) and we can clearly know that it is the record
that
is
bad
and can never succeed.

The KIP-proposed way actually has a very tricky point: it
actually
handles
a subset of record-too-large exceptions.  The broker can return
record-too-large and reject the whole batch (but we don't want
to
ignore
those because then we can skip random records that just happened
to
be
in
the same batch), in some sense we use the same error for 2
different
conditions and understanding that requires pretty deep
understanding
of
Kafka internals.

-Artem


On Wed, May 8, 2024 at 9:47 AM Justine Olshan
<jols...@confluent.io.invalid

wrote:

My concern with respect to it being fragile: the code that
ensures
the
error type is internal to the producer. Someone may see it and
say, I
want
to add such and such error. This looks like internal code, so I
don't
need
a KIP, and then they can change it to whatever they want
thinking
it
is
within the typical kafka improvement protocol.

Relying on an internal change to enforce an external API is
fragile
in
my
opinion. That's why I sort of agreed with Artem with enforcing
the
error
in
the method signature -- part of the public API.

Chris's comments on requiring more information to handler again
makes
me
wonder if we are solving a problem of lack of information at
the
application level with a more powerful solution than we need.
(Ie,
if
we
had more information, could the application close and restart
the
transaction rather than having to drop records) But I am happy
to
compromise with a handler that we can agree is sufficiently
controlled
and
documented.

Justine

On Wed, May 8, 2024 at 7:20 AM Chris Egerton
<chr...@aiven.io.invalid

wrote:

Hi Alieh,

Continuing prior discussions:

1) Regarding the "flexibility" discussion, my overarching
point
is
that I
don't see the point in allowing for this kind of pluggable
logic
without
also covering more scenarios. Take example 2 in the KIP: if
we're
going
to
implement retries only on "important" topics when a topic
partition
isn't
found, why wouldn't we also want to be able to do this for
other
errors?
Again, taking authorization errors as an example, why wouldn't
we
want
to
be able to fail when we can't write to "important" topics
because
the
producer principal lacks sufficient ACLs, and drop the record
if
the
topic
isn't "important"? In a security-conscious environment with
runtime-dependent topic routing (which is a common feature of
many
source
connectors, such as the Debezium connectors), this seems
fairly
likely.

2) As far as changing the shape of the API goes, I like
Artem's
idea
of
splitting out the interface based on specific exceptions. This
may
be
a
little laborious to expand in the future, but if we really
want
to
limit the exceptions that we cover with the handler and move
slowly
and
cautiously, then IMO it'd be reasonable to reflect that in the
interface. I
also acknowledge that there's no way to completely prevent
people
from
shooting themselves in the foot by implementing the API
incorrectly,
but
I
think it's worth it to do what we can--including leveraging
the
Java
language's type system--to help them, so IMO there's value to
eliminating
the implicit behavior of failing when a policy returns RETRY
for a
non-retriable error. This can take a variety of shapes and I'm
not
going
to
insist on anything specific, but I do want to again raise my
concerns
with
the current proposal and request that we find something a
little
better.

3) Concerning the default implementation--actually, I meant
what I
wrote
:)
I don't want a "second" default, I want an implementation of
this
interface
to be used as the default if no others are specified. The
behavior
of
this
default implementation would be identical to existing behavior
(so
there
would be no backwards compatibility concerns like the ones
raised
by
Matthias), but it would be possible to configure this default
handler
class
to behave differently for a basic set of scenarios. This would
mirror
(pun
intended) the approach we've taken with Mirror Maker 2 and its
ReplicationPolicy interface [1]. There is a default
implementation
available [2] that recognizes a handful of basic configuration
properties
[3] for simple tweaks, but if users want, they can also
implement
their
own
replication policy for more fine-grained logic if those
properties
aren't
flexible enough.

More concretely, I'm imagining something like this for the
producer
exception handler:

- Default implementation class
of
org.apache.kafka.clients.producer.DefaultProducerExceptionHandler
- This class would recognize two properties:
- drop.invalid.large.records: Boolean property, defaults to
false.
If
"false", then causes the handler to return FAIL whenever
a RecordTooLargeException is encountered; if "true", then
causes
SWALLOW/SKIP/DROP to be returned instead
- unknown.topic.partition.retry.timeout.ms: Integer
property,
defaults
to
INT_MAX. Whenever an UnknownTopicOrPartitionException is
encountered,
causes the handler to return FAIL if that record has been
pending
for
more
than the retry timeout; otherwise, causes RETRY to be returned

I think this is worth addressing now instead of later because
it
forces
us
to evaluate the usefulness of this interface and it addresses
a
long-standing issue not just with Kafka Connect, but with the
Java
producer
in general. For reference, here are a few tickets I collected
after
briefly
skimming our Jira showing that this is a real pain point for
users:
https://issues.apache.org/jira/browse/KAFKA-10340,
https://issues.apache.org/jira/browse/KAFKA-12990,
https://issues.apache.org/jira/browse/KAFKA-13634. Although
this
is
frequently reported with Kafka Connect, it applies to anyone
who
configures
a producer to use a high retry timeout. I am aware of the
max.block.ms
property, but it's painful and IMO poor behavior to require
users
to
reduce
the value of this property just to handle the single scenario
when
trying
to write to topics that don't exist, since it would also limit
the
retry
timeout for other operations that are legitimately retriable.

Raising new points:

5) I don't see the interplay between this handler and existing
retry-related properties mentioned anywhere in the KIP. I'm
assuming
that
properties like "retries", "max.block.ms", and "
delivery.timeout.ms
"
would
take precedence over the handler and once they are exhausted,
the
record/batch will fail no matter what? If so, it's probably
worth
briefly
mentioning this (no more than a sentence or two) in the KIP,
and
if
not,
I'm curious what you have in mind.

6) I also wonder if the API provides enough information in its
current
form. Would it be possible to provide handlers with some way
of
tracking
how long a record has been pending for (i.e., how long it's
been
since
the
record was provided as an argument to Producer::send)? One way
to
do
this
could be to add a method like `onNewRecord(ProducerRecord)`
and
allow/require the handler to do its own bookkeeping, probably
with a
matching `onRecordSuccess(ProducerRecord)` method so that the
handler
doesn't eat up an ever-increasing amount of memory trying to
track
records.
An alternative could be to include information about the
initial
time
the
record was received by the producer and the number of retries
that
have
been performed for it as parameters in the handle method(s),
but
I'm
not
sure how easy this would be to implement and it might clutter
things
up a
bit too much.

7) A small request--can we add Closeable (or, if you prefer,
AutoCloseable)
as a superinterface for the handler interface?

[1] -










https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/ReplicationPolicy.html
[2] -










https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html
[3] -










https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#SEPARATOR_CONFIG
,










https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG

Cheers,

Chris

On Wed, May 8, 2024 at 12:37 AM Matthias J. Sax <
mj...@apache.org

wrote:

Very interesting discussion.

Seems a central point is the question about "how generic" we
approach
this, and some people think we need to be conservative and
others
think
we should try to be as generic as possible.

Personally, I think following a limited scope for this KIP
(by
explicitly saying we only cover RecordTooLarge and
UnknownTopicOrPartition) might be better. We have concrete
tickets
that
we address, while for other exception (like authorization) we
don't
know
if people want to handle it to begin with. Boiling the ocean
might
not
get us too far, and being somewhat pragmatic might help to
move
this
KIP
forward. -- I also agree with Justin and Artem, that we want
to
be
careful anyway to not allow users to break stuff too easily.

As the same time, I agree that we should setup this change
in a
forward
looking way, and thus having a single generic handler allows
us
to
later
extend the handler more easily. This should also simplify
follow
up
KIP
that might add new error cases (I actually mentioned one more
to
Alieh
already, but we both agreed that it might be best to exclude
it
from
the
KIP right now, to make the 3.8 deadline. Doing a follow up
KIP
is
not
the end of the world.)



@Chris:

(2) This sounds fair to me, but not sure how "bad" it
actually
would
be?
If the contract is clearly defined, it seems to be fine what
the
KIP
proposes, and given that such a handler is an expert API, and
we
can
provide "best practices" (cf my other comment below in
[AL1]),
being
a
little bit pragmatic sound fine to me.

Not sure if I understand Justin's argument on this question?


(3) About having a default impl or not. I am fine with adding
one,
even
if I am not sure why Connect could not just add its own one
and
plug
it
in (and we would add corresponding configs for Connect, but
not
for
the
producer itself)? For this case, we could also do this as a
follow
up
KIP, but happy to include it in this KIP to provide value to
Connect
right away (even if the value might not come right away if we
miss
the
3.8 deadline due to expanded KIP scope...) --  For KS, we
would
for
sure
plugin our own impl, and lock down the config such that users
cannot
set
their own handler on the internal producer to begin with.
Might
be
good
to elaborate why the producer should have a default? We might
actually
want to add this to the KIP right away?

The key for a default impl would be, to not change the
current
behavior,
and having no default seems to achieve this. For the two
cases
you
mentioned, it's unclear to me what default value on "upper
bound
on
retires" for UnkownTopicOrPartitionException we should set?
Seems
it
would need to be the same as `delivery.timeout.ms`? However,
if
users
have `delivery.timeout.ms` actually overwritten we would
need
to
set
this config somewhat "dynamic"? Is this feasible? If we
hard-code 2
minutes, it might not be backward compatible. I have the
impression
we
might introduce some undesired coupling? -- For the "record
too
large"
case, the config seems to be boolean and setting it to
`false`
by
default seems to provide backward compatibility.



@Artem:

[AL1] While I see the point, I would think having a different
callback
for every exception might not really be elegant? In the end,
the
handler
is an very advanced feature anyway, and if it's implemented
in
a
bad
way, well, it's a user error -- we cannot protect users from
everything.
To me, a handler like this, is to some extend "business
logic"
and
if a
user gets business logic wrong, it's hard to protect them. --
We
would
of course provide best practice guidance in the JaveDocs, and
explain
that a handler should have explicit `if` statements for stuff
it
want
to
handle, and only a single default which return FAIL.


[AL2] Yes, but for KS we would retry at the application
layer.
Ie,
the
TX is not completed, we clean up and setup out task from
scratch,
to
ensure the pending transaction is completed before we resume.
If
the
TX
was indeed aborted, we would retry from older offset and thus
just
hit
the same error again and the loop begins again.


[AL2 cont.] Similar to AL1, I see such a handler to some
extend
as
business logic. If a user puts a bad filter condition in
their
KS
app,
and drops messages, it nothing we can do about it, and this
handler
IMHO, has a similar purpose. This is also the line of
thinking
I
apply
to EOS, to address Justin's concern about "should we allow to
drop
for
EOS", and my answer is "yes", because it's more business
logic
than
actual error handling IMHO. And by default, we fail... So
users
opt-in
to add business logic to drop records. It's an application
level
decision how to write the code.


[AL3] Maybe I misunderstand what you are saying, but to me,
checking
the
size of the record upfront is exactly what the KIP proposes?
No?



@Justin:

I saw the sample
code -- is it just an if statement checking for the error
before
the
handler is invoked? That seems a bit fragile.

What do you mean by fragile? Not sure if I see your point.




-Matthias

On 5/7/24 5:33 PM, Artem Livshits wrote:
Hi Alieh,

Thanks for the KIP.  The motivation talks about very
specific
cases,
but
the interface is generic.

[AL1]
If the interface evolves in the future I think we could have
the
following
confusion:

1. A user implemented SWALLOW action for both
RecordTooLargeException
and
UnknownTopicOrPartitionException.  For simpicity they just
return
SWALLOW
from the function, because it elegantly handles all known
cases.
2. The interface has evolved to support a new exception.
3. The user has upgraded their Kafka client.

Now a new kind of error gets dropped on the floor without
user's
intention
and it would be super hard to detect and debug.

To avoid the confusion, I think we should use handlers for
specific
exceptions.  Then if a new exception is added it won't get
silently
swalled
because the user would need to add new functionality to
handle
it.

I also have some higher level comments:

[AL2]
it throws a TimeoutException, and the user can only blindly
retry,
which
may result in an infinite retry loop

If the TimeoutException happens during transactional
processing
(exactly
once is the desired sematnics), then the client should not
retry
when
it
gets TimeoutException because without knowing the reason for
TimeoutExceptions, the client cannot know whether the
message
got
actually
produced or not and retrying the message may result in
duplicatees.

The thrown TimeoutException "cuts" the connection to the
underlying
root
cause of missing metadata

Maybe we should fix the error handling and return the proper
underlying
message?  Then the application can properly handle the
message
based
on
preferences.

 From the product perspective, it's not clear how safe it is
to
blindly
ignore UnknownTopicOrPartitionException.  This could lead to
situations
when a simple typo could lead to massive data loss (part of
the
data
would
effectively be produced to a "black hole" and the user may
not
notice
it
for a while).

In which situations would you recommend the user to "black
hole"
messages
in case of misconfiguration?

[AL3]

If the custom handler decides on SWALLOW for
RecordTooLargeException,

Is it my understanding that this KIP proposes that
functionality
that
would
only be able to SWALLOW RecordTooLargeException that happen
because
the
producer cannot produce the record (if the broker rejects
the
batch,
the
error won't get to the handler, because we cannot know which
other
records
get ignored).  In this case, why not just check the locally
configured
max
record size upfront and not produce the recrord in the first
place?
Maybe
we can expose a validation function from the producer that
could
validate
the records locally, so we don't need to produce the record
in
order
to
know that it's invalid.

-Artem

On Tue, May 7, 2024 at 2:07 PM Justine Olshan
<jols...@confluent.io.invalid>
wrote:

Alieh and Chris,

Thanks for clarifying 1) but I saw the motivation. I guess
I
just
didn't
understand how that would be ensured on the producer side.
I
saw
the
sample
code -- is it just an if statement checking for the error
before
the
handler is invoked? That seems a bit fragile.

Can you clarify what you mean by `since the code does not
reach
the
KS
interface and breaks somewhere in producer.` If we surfaced
this
error
to
the application in a better way would that also be a
solution
to
the
issue?

Justine

On Tue, May 7, 2024 at 1:55 PM Alieh Saeedi
<asae...@confluent.io.invalid>
wrote:

Hi,


Thank you, Chris and Justine, for the feedback.


@Chris

1) Flexibility: it has two meanings. The first meaning is
the
one
you
mentioned. We are going to cover more exceptions in the
future,
but
as
Justine mentioned, we must be very conservative about
adding
more
exceptions. Additionally, flexibility mainly means that
the
user
is
able
to
develop their own code. As mentioned in the motivation
section
and
the
examples, sometimes the user decides on dropping a record
based
on
the
topic, for example.


2) Defining two separate methods for retriable and
non-retriable
exceptions: although the idea is brilliant, the user may
still
make a
mistake by implementing the wrong method and see a
non-expecting
behaviour.
For example, he may implement handleRetriable() for
RecordTooLargeException
and define SWALLOW for the exception, but in practice, he
sees
no
change
in
default behaviour since he implemented the wrong method. I
think
we
can
never reduce the user’s mistakes to 0.



3) Default implementation for Handler: the default
behaviour
is
already
preserved with NO need of implementing any handler or
setting
the
corresponding config parameter `custom.exception.handler`.
What
you
mean
is
actually having a second default, which requires having
both
interface
and
config parameters. About UnknownTopicOrPartitionException:
the
producer
already offers the config parameter `max.block.ms` which
determines
the
duration of retrying. The main purpose of the user who
needs
the
handler
is
to get the root cause of TimeoutException and handle it in
the
way
he
intends. The KIP explains the necessity of it for KS
users.


4) Naming issue: By SWALLOW, we meant actually swallow the
error,
while
SKIP means skip the record, I think. If it makes sense for
more
ppl,
I
can
change it to SKIP


@Justine

1) was addressed by Chris.

2 and 3) The problem is exactly what you mentioned.
Currently,
there
is
no
way to handle these issues application-side. Even KS users
who
implement
KS
ProductionExceptionHandler are not able to handle the
exceptions
as
they
intend since the code does not reach the KS interface and
breaks
somewhere
in producer.

Cheers,
Alieh

On Tue, May 7, 2024 at 8:43 PM Chris Egerton <
fearthecel...@gmail.com>
wrote:

Hi Justine,

The method signatures for the interface are indeed
open-ended,
but
the
KIP
states that its uses will be limited. See the motivation
section:

We believe that the user should be able to develop
custom
exception
handlers for managing producer exceptions. On the other
hand,
this
will
be
an expert-level API, and using that may result in strange
behaviour
in
the
system, making it hard to find the root cause. Therefore,
the
custom
handler is currently limited to handling
RecordTooLargeException
and
UnknownTopicOrPartitionException.

Cheers,

Chris


On Tue, May 7, 2024, 14:37 Justine Olshan
<jols...@confluent.io.invalid

wrote:

Hi Alieh,

I was out for KSB and then was also sick. :(

To your point 1) Chris, I don't think it is limited to
two
specific
scenarios, since the interface accepts a generic
Exception e
and
can
be
implemented to check if that e is an instanceof any
exception.
I
didn't
see
anywhere that specific errors are enforced. I'm a bit
concerned
about
this
actually. I'm concerned about the opened-endedness and
the
contract
we
have
with transactions. We are allowing the client to make
decisions
that
are
somewhat invisible to the server. As an aside, can we
build
in
log
messages
when the handler decides to skip etc a message. I'm
really
concerned
about
messages being silently dropped.

I do think Chris's point 2) about retriable vs non
retriable
errors
is
fair. I'm a bit concerned about skipping a unknown topic
or
partition
exception too early, as there are cases where it can be
transient.

I'm still a little bit wary of allowing dropping records
as
part
of
EOS
generally as in many cases, these errors signify an
issue
with
the
original
data. I understand that streams and connect/mirror maker
may
have
reasons
they want to progress past these messages, but wondering
if
there
is
a
way
that can be done application-side. I'm willing to accept
this
sort
of
proposal if we can make it clear that this sort of thing
is
happening
and
we limit the blast radius for what we can do.

Justine

On Tue, May 7, 2024 at 9:55 AM Chris Egerton
<chr...@aiven.io.invalid

wrote:

Hi Alieh,

Sorry for the delay, I've been out sick. I still have
some
thoughts
that
I'd like to see addressed before voting.

1) If flexibility is the motivation for a pluggable
interface,
why
are
we
only limiting the uses for this interface to two very
specific
scenarios?
Why not also allow, e.g., authorization errors to be
handled
as
well
(allowing users to drop records destined for some
off-limits
topics,
or
retry for a limited duration in case there's a delay in
the
propagation
of
ACL updates)? It'd be nice to see some analysis of
other
errors
that
could
be handled with this new API, both to avoid the
follow-up
work
of
another
KIP to address them in the future, and to make sure
that
we're
not
painting
ourselves into a corner with the current API in a way
that
would
make
future modifications difficult.

2) Something feels a bit off with how retriable vs.
non-retriable
errors
are handled with the interface. Why not introduce two
separate
methods
to
handle each case separately? That way there's no
ambiguity
or
implicit
behavior when, e.g., attempting to retry on a
RecordTooLargeException.
This
could be something like `NonRetriableResponse
handle(ProducerRecord,
Exception)` and `RetriableResponse
handleRetriable(ProducerRecord,
Exception)`, though the exact names and shape can
obviously
be
toyed
with a
bit.

3) Although the flexibility of a pluggable interface
may
benefit
some
users' custom producer applications and Kafka Streams
applications,
it
comes at significant deployment cost for other
low-/no-code
environments,
including but not limited to Kafka Connect and
MirrorMaker
2.
Can
we
add
a
default implementation of the exception handler that
allows
for
some
simple
behavior to be tweaked via configuration property? Two
things
that
would
be
nice to have would be A) an upper bound on the retry
time
for
unknown-topic-partition exceptions and B) an option to
drop
records
that
are large enough to trigger a record-too-large
exception.

4) I'd still prefer to see "SKIP" or "DROP" instead of
the
proposed
"SWALLOW" option, which IMO is opaque and non-obvious,
especially
when
trying to guess the behavior for retriable errors.

Cheers,

Chris

On Fri, May 3, 2024 at 11:23 AM Alieh Saeedi
<asae...@confluent.io.invalid

wrote:

Hi all,


A summary of the KIP and the discussions:


The KIP introduces a handler interface for Producer in
order
to
handle
two
exceptions: RecordTooLargeException and
UnknownTopicOrPartitionException.
The handler handles the exceptions per-record.


- Do we need this handler?  [Motivation and Examples
sections]


RecordTooLargeException: 1) In transactions, the
producer
collects
multiple
records in batches. Then a RecordTooLargeException
related
to a
single
record leads to failing the entire batch. A custom
exception
handler
in
this case may decide on dropping the record and
continuing
the
processing.
See Example 1, please. 2) More over, in Kafka
Streams, a
record
that
is
too
large is a poison pill record, and there is no way to
skip
over
it. A
handler would allow us to react to this error inside
the
producer,
i.e.,
local to where the error happens, and thus simplify
the
overall
code
significantly. Please read the Motivation section for
more
explanation.


UnknownTopicOrPartitionException: For this case, the
producer
handles
this
exception internally and only issues a WARN log about
missing
metadata
and
retries internally. Later, when the producer hits "
deliver.timeout.ms"
it
throws a TimeoutException, and the user can only
blindly
retry,
which
may
result in an infinite retry loop. The thrown
TimeoutException
"cuts"
the
connection to the underlying root cause of missing
metadata
(which
could
indeed be a transient error but is persistent for a
non-existing
topic).
Thus, there is no programmatic way to break the
infinite
retry
loop.
Kafka
Streams also blindly retries for this case, and the
application
gets
stuck.



- Having interface vs configuration option:
[Motivation,
Examples,
and
Rejected Alternatives sections]

Our solution is introducing an interface due to the
full
flexibility
that
it offers. Sometimes users, especially Kafka Streams
ones,
determine
the
handler's behaviour based on the situation. For
example, f
acing UnknownTopicOrPartitionException*, *the user may
want
to
raise
an
error for some topics but retry it for other topics.
Having
a
configuration
option with a fixed set of possibilities does not
serve
the
user's
needs. See Example 2, please.


- Note on RecordTooLargeException: [Public Interfaces
section]

If the custom handler decides on SWALLOW for
RecordTooLargeException,
then
this record will not be a part of the batch of
transactions
and
will
also
not be sent to the broker in non-transactional mode.
So
no
worries
about
getting a RecordTooLargeException from the broker in
this
case,
as
the
record will never ever be sent to the broker. SWALLOW
means
drop
the
record
and continue/swallow the error.


- What if the handle() method implements RETRY for
RecordTooLargeException?
[Proposed Changes section]

We have to limit the user to only have FAIL or SWALLOW
for
RecordTooLargeException. Actually, RETRY must be equal
to
FAIL.
This
is
well documented/informed in javadoc.



- What if the handle() method of the handler throws an
exception?

The handler is expected to have correct code. If it
throws
an
exception,
everything fails.



This is a PoC PR <
https://github.com/apache/kafka/pull/15846

ONLY
for
RecordTooLargeException. The code changes related to
UnknownTopicOrPartitionException will be added to this
PR
LATER.


Looking forward to your feedback again.


Cheers,

Alieh

On Thu, Apr 25, 2024 at 11:46 PM Kirk True <
k...@kirktrue.pro>
wrote:

Hi Alieh,

Thanks for the updates!

Comments inline...


On Apr 25, 2024, at 1:10 PM, Alieh Saeedi
<asae...@confluent.io.INVALID

wrote:

Hi all,

Thanks a lot for the constructive feedbacks!



Addressing some of the main concerns:


- The `RecordTooLargeException` can be thrown by
broker,
producer
and
consumer. Of course, the `ProducerExceptionHandler`
interface
is
introduced
to affect only the exceptions thrown from the
producer.
This
KIP
very
specifically means to provide a possibility to
manage
the
`RecordTooLargeException` thrown from the
Producer.send()
method.
Please
see “Proposed Changes” section for more clarity. I
investigated
the
issue
there thoroughly. I hope it can explain the concern
about
how
we
handle
the
errors as well.



- The problem with Callback: Methods of Callback are
called
when
the
record
sent to the server is acknowledged, while this is
not
the
desired
time
for
all exceptions. We intend to handle exceptions
beforehand.

I guess it makes sense to keep the expectation for
when
Callback
is
invoked as-is vs. shoehorning more into it.

- What if the custom handler returns RETRY for
`RecordTooLargeException`? I
assume changing the producer configuration at
runtime
is
possible.
If
so,
RETRY for a too large record is valid because maybe
in
the
next
try,
the
too large record is not poisoning any more. I am not
100%
sure
about
the
technical details, though. Otherwise, we can
consider
the
RETRY
as
FAIL
for
this exception. Another solution would be to
consider
a
constant
number
of
times for RETRY which can be useful for other
exceptions
as
well.

It’s not presently possible to change the
configuration
of
an
existing
Producer at runtime. So if a record hits a
RecordTooLargeException
once,
no
amount of retrying (with the current Producer) will
change
that
fact.
So
I’m still a little stuck on how to handle a response
of
RETRY
for
an
“oversized” record.

- What if the handle() method itself throws an
exception?
I
think
rationally and pragmatically, the behaviour must be
exactly
like
when
no
custom handler is defined since the user actually
did
not
have
a
working
handler.

I’m not convinced that ignoring an errant handler is
the
right
choice.
It
then becomes a silent failure that might have
repercussions,
depending
on
the business logic. A user would have to proactively
trawls
through
the
logs for WARN/ERROR messages to catch it.

Throwing a hard error is pretty draconian, though…

- Why not use config parameters instead of an
interface?
As
explained
in
the “Rejected Alternatives” section, we assume that
the
handler
will
be
used for a greater number of exceptions in the
future.
Defining a
configuration parameter for each exception may make
the
configuration a
bit
messy. Moreover, the handler offers more
flexibility.

Agreed that the logic-via-configuration approach is
weird
and
limiting.
Forget I ever suggested it ;)

I’d think additional background in the Motivation
section
would
help
me
understand how users might use this feature beyond a)
skipping
“oversized”
records, and b) not retrying missing topics.

Small change:

-ProductionExceptionHandlerResponse -> Response for
brevity
and
simplicity.
Could’ve been HandlerResponse too I think!

The name change sounds good to me.

Thanks Alieh!



I thank you all again for your useful
questions/suggestions.

I would be happy to hear more of your concerns, as
stated
in
some
feedback.

Cheers,
Alieh

On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan
<jols...@confluent.io.invalid> wrote:

Thanks Alieh for the updates.

I'm a little concerned about the design pattern
here.
It
seems
like
we
want
specific usages, but we are packaging it as a
generic
handler.
I think we tried to narrow down on the specific
errors
we
want
to
handle,
but it feels a little clunky as we have a generic
thing
for
two
specific
errors.

I'm wondering if we are using the right patterns to
solve
these
problems. I
agree though that we will need something more than
the
error
classes
I'm
proposing if we want to have different handling be
configurable.
My concern is that the open-endedness of a handler
means
that
we
are
creating more problems than we are solving. It is
still
unclear
to
me
how
we expect to handle the errors. Perhaps we could
include
an
example?
It
seems like there is a specific use case in mind and
maybe
we
can




Reply via email to