I agree with Alieh and Artem -- in the end, why buffer records twice? We effectively want to allow to push some error handling (which I btw consider "business logic") into the producer. IMHO, there is nothing wrong with it. Dropping a poison pill record is no really a violation of atomicity from my POV, but a business logic decision to not include a record in a transaction -- the proposed API just makes it much simpler to achieve this business logic goal.


For memory size estimation, throughput or message size is actually not relevant, right? We would need to look at producer buffer size, ie, `batch.size`, `max.in.flight.request.per.connection` and guesstimate the number of connections there might be? At least for KS, we don't need to buffer everything until commit, but only until we get a successful "ack" back.

Note that KS application not only need to write to (a single) user result topic, but multiple output topics, as well as repartition and changelog topics, across all tasks assigned to a thread (ie, producer), which can easily be 10 tasks or more. If we assume topics with 30 partitions (topics with 50 or more partitions are not uncommon either), and a producer who must write to 10 different topics, the number of required connections is very quickly very high, and thus the required "application buffer space" would be significant.



Your others ideas seems not to be viable alternatives:

Streams users that specifically want to drop oversize records can
estimate the size of their data and drop records which are too
large, enforcing their own limits that are lower than the Kafka limits.

"Estimation" is not sufficient, but we would need to know it exactly. And that's an impl detail, given that the message format could change and we could add new internal fields increasing the message size. The idea to add some `producer.serializedRecordSize()` helper method was discussed, but it's a very ugly API and clumsy to use -- also, the user code would need to know the producer config which it might not have access to (as it might get passed in from some config file; and it might also be changed).

Some other alternative we also discussed was, to let `send()` throw an exception for a "record too large" case directly. However, this solution raises backward compatibly concerns, and it might also not help us to extend the solution in the future (eg, tackle broker side errors). So we discarded this idea.



Streams users that want CONTINUE semantics can use at_least_once
semantics

Not really. EOS is mainly about not having duplicates in the result, but at-least-once cannot provide this guarantee. (Even if I repeat my self: but dropping a poison pill record based on a business logic decision is not data loss, but effectively a business logic filter...)



Streams itself can store record hashes/coordinates and fast rewind to
the end of the last transaction, recomputing data rather than storing it.

Given the very complex nature of topologies, with joins, aggregations, flatmaps etc, this is a 100x more complex solution and not viable in practice.



Streams can define exactly_once + CONTINUE semantics to permit the whole
transaction to be dropped, because it would allow the next batch to be
started processing.

Would this not be much worse? I have a single poison pill record and would need to drop a full tx (this could be tens of thousands of records...). Also, given that KS write into changelog topic in the same TX, this could break the whole application.



Streams can emit records with both a transactional and non-transactional
producer if some records are not critical-path

We (1) already have a "too many connections" problem with KS so using move clients is something we try to avoid (and we actually hope to reduce the number of client and connection mid to long term), (2) this would be very hard to express at the API level to the user, and (3) it would provide very weird semantics.



they should optimize for smaller transactions,

IMHO, this would not work in practice because transaction have a high overhead and commit-interval is used to tradeoff throughput vs end-to-end latency. Given certain throughput requirement, it would not be possible to just use a lower commit interval to reduce memory requirements.



-Matthias




On 7/15/24 2:25 PM, Artem Livshits wrote:
Hi Greg,

This makes me think that this IGNORE_SEND_ERRORS covers an arbitrary set
of error conditions that may be expanded in the future, possibly to cover
the broker side RecordTooLargeException.

I don't think it contradicts what I said (the keyword here is "in the
future") -- with the current functionality, the correct way to handle RTLE
is by only letting the client ignore client-originated RTLE (this can be
easily implemented on the client side).  In the future, we can improve on
that by making the broker return a different exception for batch-too-large
case, then the producer would be able to return broker side exceptions as
well (and if the application chooses to ignore it -- it will be able to,
but it would be an explicit choice rather than ignoring it by mistake), in
this case the producer client would encapsulate backward compatibility
logic when it connects to older brokers to make sure the the application
doesn't accidentally gets RTLE originated by the old broker.  This
functionality is obviously more involved and we'll need to see if going all
the way is justified, but the partial client-only solution doesn't close
the door.

So one way to look at the current situation is the following:

1. We can do a low effort partial solution to solve a real existing
problem.  We can easily prove that it would do exactly what it needs to do
with minimal risk of regression.
2. We have a path to a more comprehensive solution, so if we justify the
effort required for that, we can get there.

BTW, as a side note (I think a saw a question in the thread), we do try to
introduce error categories here
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1050%3A+Consistent+error+handling+for+Transactions
so eventually we may have a better classification for the errors.

"if a streams producer is producing 1MB/s, and the commit interval is 1
hour, I expect 3600MB of additional heap needed ...

Agree, that would be ideal.  On the other hand, the effort to prove that
keeping all records in memory won't break some scenarios (and generally
breaking one is enough to cause a lot of pain) seems to be significantly
higher than to prove that setting some flag in some API has pretty much 0
chance of regression (we basically have a flag to say "unfix KAFKA-9279" so
we're getting to fairly "known good" state).  I'll let KStream folks
comment on this one (and we still need to solve the problem of accidental
handling of RTLE originated from broker, so some KIP would be required to
somehow help to differentiate those).

-Artem

On Mon, Jul 15, 2024 at 1:31 PM Greg Harris <greg.har...@aiven.io.invalid>
wrote:

Hi Artem,

Thank you for clarifying as I'm joining the conversation late and may have
some misconceptions.

Because of this, a more "complete" solution that
allows ignoring RecordTooLargeException regardless of its origin is
actually incorrect, while a "partial" solution that allows ignoring
RecordTooLargeException only originating in client code accomplishes the
required functionality.

This is not how I understood this feature. Above Matthias said the
following:

We can do
follow up KIP for other errors on an on-demand basis and fix-forward /
enlarge the scope successively.

This makes me think that this IGNORE_SEND_ERRORS covers an arbitrary set of
error conditions that may be expanded in the future, possibly to cover the
broker side RecordTooLargeException.

Obviously, we could solve this problem by changing logic in the
broker to return a different error when the batch is too large, but right
now this is not the case

If the broker/wire protocol isn't ready for these errors to be propagated,
then I don't think we're ready to add this API. It's going to be
under-generalized, and there's a decent chance that we're going to regret
the design choices in the future. And users that expect it to be fully
generalized are going to be disappointed when they don't read the fine
print and still get faulted by non-covered errors.

AL2.  In a high performance system, "just an optimization" can be a
functional requirement ...
  I just wanted to make the point that we shouldn't necessarily dismiss
API changes that allow for optimizations.

My earlier statement didn't dismiss this feature as "just an optimization",
actually the opposite. I said that performance could be a justification,
but only if it is quantified and stated explicitly. We shouldn't be voting
on hand-wavy optimizations, we should be voting on things that are
quantifiable.
For example an analysis like the following would facilitate further
discussion: "if a streams producer is producing 1MB/s, and the commit
interval is 1 hour, I expect 3600MB of additional heap needed per
producer". We can then discuss whether we expect higher or lower
throughput, commit intervals, or heap usage to determine what the operating
envelope of this feature could be.
If there are a substantial number of users that have high throughput, long
commit intervals, _and_ RTLEs, then this feature could make sense. If not,
then the downsides of this feature (complication of the API,
under-specification of the error coverage, etc) look unjustified. In fact,
if the number of users regularly encountering RTLEs is sufficiently small,
I would strongly advocate for an application-specific workaround instead of
trying to fix this in Streams, or make memory buffering an optional feature
in streams.

Thanks,
Greg

On Mon, Jul 15, 2024 at 1:29 PM Greg Harris <greg.har...@aiven.io> wrote:

Hi Alieh,

Thanks for your response.

what does a user do
after a transaction is failed due to a `too-large-record `exception?
They
will submit the same batch without the problematic record again.

If they re-submit the same record, they are indicating that this record
is
an integral part of the transaction, and the transaction should only be
committed with it present. If the record isn't integral to the
transaction,
they shouldn't submit it as part of the transaction.

Regarding your solution to solve the issue application-side:  I am a
bit hesitant to keep all sent records in memory since I think buffering
records twice (both in Streams and Producer) would not be an efficient
solution.

I understand your hesitation, and this touches on the "performance"
caveat
of the end-to-end arguments in system design. There are no perfect
designs,
and some API cleanliness may be sacrificed in favor of more performant
solutions. You would need to make a concrete and convincing argument that
the performance of this solution would be better than every alternative.
To
that end, I would recommend that you add more to the "Rejected
Alternatives" section, as that is going to carry this proposal.
Some alternatives that I can think of, but which aren't necessarily
better:
1. Streams users that specifically want to drop oversize records can
estimate the size of their data and drop records which are too
large, enforcing their own limits that are lower than the Kafka limits.
2. Streams users that want CONTINUE semantics can use at_least_once
semantics
3. Streams itself can store record hashes/coordinates and fast rewind to
the end of the last transaction, recomputing data rather than storing it.
4. Streams can define exactly_once + CONTINUE semantics to permit the
whole transaction to be dropped, because it would allow the next batch to
be started processing.
5. Streams can emit records with both a transactional and
non-transactional producer if some records are not critical-path

To generalize this point: Suppose an application tries to minimize
storage
costs by having only one party responsible for a piece of data at a time.
They initially have the data, call send(), and want to know the earliest
time they can forget the data and transfer the responsibility to Kafka.
With a non-transactional producer, they are responsible for the data
until
the send() callback has succeeded. With a transactional producer, they
are
responsible for the data until commitTransaction() has succeeded.
With this proposed change that makes the producer tolerate
too-large-exceptions, applications are still responsible for storing
their
data until commitTransaction() has succeeded, because abortTransaction()
could have also been called, or the producer could have been fenced, or
any
number of other failures could have occurred. This feature does not
enable
Streams to drop responsibility earlier, it carves out a specific
situation
in which it doesn't have to rewind processing, which is a performance
concern.

For Streams and the general case, if an application is trying to optimize
storage costs, they should optimize for smaller transactions, because
this
both lowers the bound on record re-delivery and lowers the likelihood of
a
bad record being included in any individual transaction.

Thanks,
Greg

On Mon, Jul 15, 2024 at 12:35 PM Artem Livshits
<alivsh...@confluent.io.invalid> wrote:

Hi Greg,

What you say makes a lot of sense.  I just wanted to clarify a couple of
subtle points.

AL1. There is a functional reason to handle errors that happen on send
(oginate in the producer logic in the client) vs. errors that are
returned
from the broker.  The problem is that RecordTooLargeException is
returned
in two cases: (1) the producer logic on the client checks that record is
too large and throws the exception before doing anything with this --
this
is very "clean" situation with one specific record being marked as
"poison
pill" and rejected; (2) the broker throws the same error if the batch is
too large -- the batch may include multiple records and none of them
would
necessarily be a "poison pill" record, it's just a random
misconfiguration
of client vs. broker.  Because of this, a more "complete" solution that
allows ignoring RecordTooLargeException regardless of its origin is
actually incorrect, while a "partial" solution that allows ignoring
RecordTooLargeException only originating in client code accomplishes the
required functionality.  This is an important nuance and should be added
to
the KIP.  Obviously, we could solve this problem by changing logic in
the
broker to return a different error when the batch is too large, but
right
now this is not the case (and to have the correct error handling we'd
need
to know the version of the broker so we can only drop the records if the
error is returned from a broker that knows to return a different error).

AL2.  In a high performance system, "just an optimization" can be a
functional requirement -- if a solution impacts memory or computational
complexity (in the sense of bigO notation) on the main code path I can
justify changing APIs to avoid such an impact.  I'll let KStream folks
comment on whether an implementation that requires storing records in
memory actually violates the computational complexity on the main code
path, I just wanted to make the point that we shouldn't necessarily
dismiss
API changes that allow for optimizations.

-Artem

On Fri, Jul 12, 2024 at 1:07 PM Greg Harris
<greg.har...@aiven.io.invalid

wrote:

Hi all,

Alieh, thanks for the KIP! And everyone else, thanks for the robust
discussion.

I understand that there are situations in which users desire that the
pipeline "just keep working" and skip errors. However, I question
whether
it is appropriate to support/encourage this behavior via inclusion in
the
Producer API.
This feature is essentially a "non-atomic transaction", as it allows
commits in which not all records passed to send() ultimately get
committed.
As atomicity is one of the most important semantics associated with
transactions, I question whether there are users other than Streams
that
would choose non-atomic transactions over a traditional/idempotent
producer.
Some cursory research shows that non-atomic transactions may be
present
in
other databases, but is actively discouraged due to the complexity
they
add
to error-handling. [1]

I'd like to invoke the End-to-End Arguments in System Design [2] here,
and
recommend that this behavior may be present in Streams, but should not
be
in the Producer.
1. Dropping records that cause errors is already expressible via the
current Producer API. You can store the records in-memory after
calling
send(), wait for a successful no-error flush() before calling
commitTransaction() and allowing the record to be garbage collected.
If
errors occur, abortTransaction() and re-submit the records.
2. Implementing this inside the Producer API is complex and difficult
to
holistically define in a way that we won't regret or need to change
later.
I think some of the disagreement in this thread originates from this,
and I
don't find the proposed API satisfactory.
3. The performance improvement of including this change in the lower
level
needs to be quantified in order to be a justification, and I don't see
any
analysis about this.

I imagine that the alternative implementation I suggested in (1) would
also
enable more expressive error handlers in Streams, if such a thing was
desired. Keeping the record around until after the transaction is
committed
would enable a DLQ or passing the erroneous record to the error
handler.

I think that the current pattern of the application being responsible
for
providing good data to the producer is very reasonable; Having the
producer
responsible for implementing the application's error handling of bad
data
is not something I can support.

Thanks,
Greg

[1] https://www.sommarskog.se/error_handling/Part1.html
[2]
https://web.mit.edu/Saltzer/www/publications/endtoend/endtoend.pdf

On Fri, Jul 12, 2024 at 8:52 AM Justine Olshan
<jols...@confluent.io.invalid>
wrote:

Can we update the KIP to clearly document these decisions?

Thanks,

Justine

On Tue, Jul 9, 2024 at 9:25 AM Andrew Schofield <
andrew_schofi...@live.com

wrote:

Hi Chris,
As it stands, the error handling for transactions in KafkaProducer
is
not
ideal. There’s no reason why a failed operation should fail a
transaction
provided that the application can tell that the operation was not
included
in the transaction and then make its own decision whether to
continue
or
back out. So, I think I disagree with the original premise of a
client-side
error state for a transaction, but we are where we are.

When I voted, I did not expect the KIP to handle ALL errors which
could
conceivably be handled. I did expect it to handle client-side send
errors
that would cause a record to be rejected from a batch before
sending
to a
broker. I think that it does make the KafkaProducer interface very
slightly
more complicated, but the new option is a clear improvement and I
don’t see anyone getting into a mess using it.

I think broker-side errors are more tricky and I don’t think an
overload
on the send() method is going to do the job. I don’t see that as a
problem
with the KIP, just that the underlying RPCs and behaviour is not
very
amenable to record-specific error handling. The Produce RPC is a
complicated beast which can include a set of records for mutiple
topic-partitions. Although ProduceResponse v10 does include record
errors, I don’t believe this is surfaced in the client. Let’s
imagine
something
like broker-side record validation which barfs on one record.
Failing
an
entire batch is easier, but less useful if the problem is related
to
one
record.

In summary, I’m happy that my vote stands, and I am happy with the
KIP
only supporting client-side record errors.

Thanks,
Andrew

On 8 Jul 2024, at 16:37, Chris Egerton <chr...@aiven.io.INVALID

wrote:

Hi Alieh,

Can you clarify why broker-side errors shouldn't be covered? The
only
real
rationale I can come up with is that it's easier to implement.

"Things were better for Kafka Streams before KAFKA-9279 was
fixed"
isn't
very convincing, because Kafka Streams is not the only user of
the
Java
producer client. And for others, especially new users, I doubt
that
this
new API we're proposing would make sense without having to
consult a
lot
of
historical context.

I also don't think that most users will know or even care about
the
distinction between errors that cause a record to fail before
it's
added
to
a batch vs. after. If you were writing a producer application of
your
own,
and you wanted to handle RecordTooLargeException instances by
dropping
a
record without aborting a transaction, would you care about
whether
it
was
your client or your broker that balked? Would you be happy if
you
wrote
logic expecting that that problem was solved once and for all,
only
to
learn that it could still affect you in other circumstances? Or,
alternatively, would you be happy if you wanted to solve that
problem
and
found an API that seemed to do exactly what you wanted, but
after
reading
the fine print, realized you'd have to do it yourself instead?

Ultimately, the more I think about this, the more I believe that
we're
adding noise to the API (with the new overloaded variant of
send)
for a
feature that will likely bring confusion and even frustration to
anyone
besides maintainers of Kafka Streams who tries to use it.

If the only concern about covering broker-side errors is that it
would
be
more difficult to implement, I believe we should strongly
reconsider
that
alternative. That said, if there is a straightforward way to
explain
this
feature to new users that won't mislead them or require them to
do
research
on producer internals, then I can still live with it.

Regarding a list of recoverable vs. irrecoverable errors, this
is
actually
the subject of another recently-introduced KIP:





https://cwiki.apache.org/confluence/display/KAFKA/KIP-1050%3A+Consistent+error+handling+for+Transactions

Finally, I'd also like to ask the people who have already voted
(Andrew,
Matthias) if, at the time they voted, they believed that the API
would
handle all errors, or only the subset of errors that would
cause a
record
to be rejected from a batch before it can be sent to a broker.

Best,

Chris

On Thu, Jul 4, 2024 at 12:43 PM Alieh Saeedi
<asae...@confluent.io.invalid>
wrote:

Salut from the KIP’s author


Clarifying two points:


1) broker side errors:

As far as I remember we are not going to cover the errors
originating
from
the broker!

A historical fact: One of the debate points in KIP-1038 was
that
by
defining a producer custom handler, the user may assume that
broker-side
errors must be covered as well. They may define a handler for
handling
`RecordTooLargeException` and still see such errors not being
handled
as
they wish.


2) Regarding irrecoverable/recoverable errors:

Before the fix of `KAFKA-9279`,  errors such as
`RecordTooLargeException`
or errors related to missing meta data (both originating from
Producer
`send()`) were considered as recoverable but after that they
turned
into
being irrecoverable without changing any Javadocs or having any
KIP.
All
the effort made in this KIP and the former one have been
towards
returning
to the former state.


I am sure that it is clear for you that which sort of errors we
are
going
to cover: A single record may happen to NOT get added to the
batch
due
to
the issues with the record or its corresponding topic. The
point
was
that
if the record is not added to the batch let ’s don’t fail the
whole
batch
because of that non-existing record. We never intended to do
sth
in
broker
side or ignore more important errors.  But I agree with you
Chris.
If
we
are adding a new API, we must have good documentation for that.
The
sentence `all irrecoverable transactional errors will still be
fatal`
as
you suggested is good. What do you think? I am totally against
enumerating
errors in Javadocs since these sort of errors can be changing
during
time.  More
over, have you ever seen any list of recoverable or
irrecoverable
errors
somewhere so far?


Bests,

Alieh

On Wed, Jul 3, 2024 at 6:07 PM Chris Egerton
<chr...@aiven.io.invalid

wrote:

Hi Justine,

I agree that enumerating a list of errors that should be
covered by
the
KIP
is difficult; I was thinking it might be easier if we list the
errors
that
should _not_ be covered by the KIP, and only if we can't
define
a
reasonable heuristic that would cover them without having to
explicitly
list them. Could it be enough to say "all irrecoverable
transactional
errors will still be fatal", or even just "all transactional
errors
(as
opposed to errors related to this specific record) will still
be
fatal"?

Cheers,

Chris

On Wed, Jul 3, 2024 at 11:56 AM Justine Olshan
<jols...@confluent.io.invalid>
wrote:

Hey Chris,

I think what you say makes sense. I agree that defining the
behavior
based
on code that can possibly change is not a good idea, and I
was
trying
to
get a clearer definition from the KIP's author :)

I think it can always be hard to ensure that only specific
errors
are
handled unless they are explicitly enumerated in code as the
code
can
change and can be changed by folks who are not aware of this
KIP
or
conversation.
I personally don't have the bandwidth to do this
definition/enumeration
of
errors, so hopefully Alieh can expand upon this.

Justine

On Wed, Jul 3, 2024 at 8:28 AM Chris Egerton
<chr...@aiven.io.invalid

wrote:

Hi Alieh,

I don't love defining the changes for this KIP in terms of a
catch
clause
in the KafkaProducer class, for two reasons. First, the set
of
errors
that
are handled by that clause may shift over time as the code
base
is
modified, and second, it would be fairly opaque to users who
want
to
understand whether an error would be affected by using this
API
or
not.

It also seems strange that we'd handle some types of
RecordTooLargeException (i.e., ones reported client-side)
with
this
API,
but not others (i.e., ones reported by a broker).

I think this kind of API would be most powerful, most
intuitive
to
users,
and easiest to document if we expanded the scope to all
record-send-related
errors, except anything indicating issues with exactly-once
semantics.
That
would include records that are too large (when caught both
client-
and
server-side), records that can't be sent due to
authorization
failures,
records sent to nonexistent topics/topic partitions, and
keyless
records
sent to compacted topics. It would not include
ProducerFencedException, InvalidProducerEpochException,
UnsupportedVersionException,
and possibly others.

@Justine -- do you think it would be possible to develop
either a
better
definition for the kinds of "excluded" errors that should
not
be
covered
by
this API, or, barring that, a comprehensive list of exact
error
types?
And
do you think this would be acceptable in terms of risk and
complexity?

Cheers,

Chris

On Tue, Jul 2, 2024 at 5:05 PM Alieh Saeedi
<asae...@confluent.io.invalid

wrote:

Hey Justine,

About the consequences: the consequences will be like when
we
did
not
have
the fix made in `KAFKA-9279`: silent loss of data!
Obviously,
when
the
user
intentionally chose to ignore errors, that would not be
silent
any
more.
Right?
Of course, considering all types of `ApiException`s would
be
too
broad.
But
are the exceptions caught in `catch(ApiException e)` of the
`doSend()`
method also too broad?

-Alieh

On Tue, Jul 2, 2024 at 9:45 PM Justine Olshan
<jols...@confluent.io.invalid

wrote:

Hey Alieh,

If we want to allow any error to be ignored we should
probably
run
through
all the errors to make sure they make sense.
I just want to feel confident that we aren't just making a
decision
without
considering the consequences carefully.

Justine

On Tue, Jul 2, 2024 at 12:30 PM Alieh Saeedi
<asae...@confluent.io.invalid

wrote:

Hey Justine,

yes we talked about `RecordTooLargeException` as an
example,
but
did
we
ever limit ourselves to only this specific exception? I
think
neither
in
the KIP nor in the PR.  As Chris mentioned, this KIP is
going
to
undo
what
we have done in `KAFKA-9279` in case 1) the user is in a
transaction
and
2)
he decides to ignore the errors in which the record was
not
even
added
to
the batch. Yes, and we suggested some methods for undoing
or,
in
fact,
moving back the transaction from the error state in
`flush` or
in
`commitTnx` and we finally came to the idea of not even
doing
the
changes
(better than undoing) in `send`.

Bests,
Alieh

On Tue, Jul 2, 2024 at 8:03 PM Justine Olshan
<jols...@confluent.io.invalid

wrote:

Hey folks,

I understand where you are coming from by asking for
specific
use
cases.
My
understanding based on previous conversations was that
there
were a
few
different errors that have been seen.
One example I heard some information about was when the
record
was
too
large and it fails the batch. Besides that, I'm not
really
sure
if
there
are cases in mind, though it is fair to ask on those and
bring
them
up.

Does a record qualify as a poison pill if it targets a
topic
that
doesn't exist? Or if it targets a topic that the
producer
principal
lacks
ACLs for? What if it fails broker-side validation (e.g.,
has
a
null
key
for
a compacted topic)?

I think there was some parallel work with addressing the
UnknownTopicOrPartitionError in another way. As for the
other
checks,
acls,
validation etc. I am not aware of that being in Alieh's
scope,
but
we
should be clear about exactly what we are doing.

All errors that fall into ApiException seems too broad
to
me.

Justine

On Tue, Jul 2, 2024 at 10:51 AM Alieh Saeedi
<asae...@confluent.io.invalid

wrote:

Hey Chris,
thanks for sharing your concerns.

1) About the language of KIP (or maybe later in
Javadocs):
Is
that
alright
if I write all errors that fall into the `ApiException`
category
thrown
(actually returned) by Producer?
2) About future expansion: do you have any better
suggestions
for
enum
names? Do you think `IGNORE_API_EXEPTIONS` or something
like
that
is
a
"better/more accurate" one?

Bests,
Alieh

On Tue, Jul 2, 2024 at 7:29 PM Chris Egerton
<chr...@aiven.io.invalid

wrote:

Hi Alieh and Justine,

I'm concerned that we're settling on a definition of
"poison
pill"
that's
easiest to tackle right now but may lead to
shortcomings
down
the
road. I
understand the relationship between this KIP and
KAFKA-9279,
and
I
can
totally get behind the desire to keep things small,
focused,
and
simple
in
the name of avoiding bugs. However, what I don't think
is
clear
at
all
is
what the "specific circumstances" are that Justine
mentioned. I
had a
drastically different idea of what the intended
behavioral
change
would
be
before looking at the draft PR.

I would like 1) for us to be clearer about the
categories
of
errors
that
we
want to cover with this new API (especially since
we'll
have
to
find
a
clear, succinct way to document this for users), and
2)
to
make
sure
that
if we do try to expand this API in the future, that we
won't
be
painted
into a corner.

For item 1, hopefully we can agree that the language
in
the
KIP
for IGNORE_SEND_ERRORS ("The records causing
irrecoverable
errors
are
excluded from the batch and the transaction is
committed
successfully.")
is
pretty vague. If we start using the phrase "poison
pill
record"
that
could
help, but IMO more detail would still be needed. We
know
that
we
want
to
include records that are so large that they can be
immediately
rejected
by
the producer. But there are other cases that users
might
expect
to
be
handled. Does a record qualify as a poison pill if it
targets a
topic
that
doesn't exist? Or if it targets a topic that the
producer
principal
lacks
ACLs for? What if it fails broker-side validation
(e.g.,
has
a
null
key
for
a compacted topic)?

For item 2, this really depends on how narrow the
scope
of
what
we're
doing
right now is. If we only handle a subset of the
examples
I
laid
out
above
that could possibly be considered poison pills with
this
KIP,
do
we
want
to
lock ourselves in to never addressing more in the
future,
or
can
we
choose
an API (probably just enum names would be the only
important
decision
here)
that leaves room for more later?

Best,

Chris



On Tue, Jul 2, 2024 at 12:28 PM Justine Olshan
<jols...@confluent.io.invalid>
wrote:

Chris and Alieh,

My understanding is that this KIP is really only
trying
to
solve
an
issue
of a "poison pill" record that fails send().
We've talked a lot about having a generic framework
for
all
errors,
but I
don't think that is what this KIP is trying to do.
Essentially
the
request
is to undo the change from KAFKA-9279
<https://issues.apache.org/jira/browse/KAFKA-9279>
but
under
specific
circumstances that are controlled. I really am
concerned
about
opening
new
avenues for bugs with EOS and hesitate to handle any
other
types
of
errors.
I think if we all agree on the problem that we are
trying
to
solve,
it
is
easier to agree on solutions.

Justine

On Mon, Jul 1, 2024 at 2:20 AM Alieh Saeedi
<asae...@confluent.io.invalid

wrote:

Hi Matthias,
Thanks for the valid points you mentioned. I updated
the
KIP
and
the
PR
with:
1) mentioning that the new overloaded `send` throws
`IllegalStateException`
if the user tries to ignore `send()` errors outside
of
a
transaction.
2) the default implementation in `Producer`
interface
throws
an
`UnsupportedOperationException`

Hi Chris,
Thanks for the feedback. I tried to clarify the
points
you
listed:
-------> we've narrowed the scope from any error
that
might
take
place
with
producing a record to Kafka, to only the ones that
are
thrown
directly
from
Producer::send;

 From the very beginning and even since KIP-1038, the
main
purpose
was
to
have "more flexibility," or, in other words, "giving
the
user
the
authority" to handle some specific exceptions thrown
from
the
`Producer`.
Due to the specific cases we had in mind, KIP-1038
was
discarded
and
we
decided to not define a `CustomExceptionHandler` for
`Producer`
and
instead
treat the `send` failures in a different way. The
main
issue
is
that
`send`
makes a transition to error state, which is
undoable.
In
fact,
one
single
poison pill record makes the whole batch fail. The
former
suggestions
that
you agreed with have been all about un-doing this
transition
in
`flush`
or
`commit`. The new suggestion is to un-do (or better,
NOT
do)
in
`send`
due
to the reasons listed in the discussions above.
Moreover, I would say that having such a large scope
as
you
mentioned
is
impossible. In the best case, we may have control
over
the
`Producer`.
What
shall we do with the broker? The `any error that
might
take
place
with
producing a record to Kafka` is too much, I think.

-------> is this all we want to handle, and will it
prevent
us
from
handling more in the future in an intuitive way?

I think yes. This is all we want. Other sorts of
errors
such
as
having
problem with partition addition, producer fenced
exception,
etc
seem
to
be
more serious issues. The intention was to handle
problems
created
by
(maybe) a single poison pill record. BTW, I do not
see
any
obstacles
to
future changes.

Bests,
Alieh

On Sat, Jun 29, 2024 at 3:03 AM Chris Egerton
<chr...@aiven.io.invalid

wrote:

Ah, sorry--spoke too soon. The PR doesn't show that
errors
thrown
from
Producer::send are handled, but instead,
ApiException
instances
that
are
caught inside KafkaProducer::doSend and are handled
by
returning
an
already-failed future are. I think the same
question
still
applies
(is
this
all we want to handle, and will it prevent us from
handling
more
in
the
future in an intuitive way), though.

On Fri, Jun 28, 2024 at 8:57 PM Chris Egerton <
chr...@aiven.io

wrote:

Hi Alieh,

This KIP has evolved a lot since I last looked at
it,
but
the
changes
seem
well thought-out both in semantics and API. One
clarifying
question I
have
is that it looks based on the draft PR that we've
narrowed
the
scope
from
any error that might take place with producing a
record
to
Kafka,
to
only
the ones that are thrown directly from
Producer::send;
is
that
the
intended
behavior here? And if so, do you have thoughts on
how
we
might
design a
follow-up KIP that would catch all errors
(including
ones
reported
asynchronously instead of synchronously)? I'd
like
it
if
we
could
leave
the
door open for that without painting ourselves
into
too
much
of
a
corner
with the API design for this KIP.

Cheers,

Chris

On Fri, Jun 28, 2024 at 6:31 PM Matthias J. Sax <
mj...@apache.org>
wrote:

Thanks Alieh,

it seems this KIP can just pick between a couple
of
tradeoffs.
Adding
an
overloaded `send()` as the KIP propose makes
sense
to
me
and
seems
to
provides the cleanest solution compare to there
options
we
discussed.

Given the explicit name of the passed-in option
that
highlights
that
the
option is for TX only make is pretty clear and
avoids
the
issue
of
`flush()` ambiguity.


Nit: We should make clear on the KIP though,
that
the
new
`send()`
overload would throw an `IllegalStateException`
if
TX
are
not
used
(similar to other TX methods like initTx(), etc)


About the `Producer` interface, I am not sure
how
this
was
done
in
the
past (eg, KIP-266 added
`Consumer.poll(Duration)`
w/o
a
default
implementation), if we need a default
implementation
for
backward
compatibility or not? If we do want to add one,
I
think
it
would
be
appropriate to throw an
`UnsupportedOperationException`
by
default,
instead of just keeping the default impl empty?


My points are rather minor, and should not block
this
KIP
though.
Overall LGTM.



-Matthias

On 6/27/24 1:28 PM, Alieh Saeedi wrote:
Hi Justine,

Thanks for the suggestion.
Making applications to validate every single
record
is
not
the
best
way,
from an efficiency point of view.
Moreover, between changing the behavior of the
Producer
in
`send`
and
`commitTnx`, the former seems more reasonable
and
clean.

Bests,
Alieh

On Thu, Jun 27, 2024 at 8:14 PM Justine Olshan
<jols...@confluent.io.invalid>
wrote:

Hey Alieh,

I see there are two options now. So folks
will
be
discussing
the
approaches
and deciding the best way forward before we
vote?
I do think there could be a problem with the
approach
on
commit
if
we
get
stuck on an earlier error and have more
records
(potentially
on
new
partitions) to commit as the current PR is
implemented.

I guess this takes us back to the question of
whether
the
error
should
be
cleared on send.

(And I guess at the back of my mind, I'm
wondering
if
there
is
a
way
we can
validate the "posion pill" records
application
side
before
we
even
try
to
send them)

Justine

On Wed, Jun 26, 2024 at 4:38 PM Alieh Saeedi
<asae...@confluent.io.invalid

wrote:

Hi Justine,

I did not update the KIP with
`TxnSendOption`
since
I
thought
it'd
be
better discussed here beforehand.
right now, there are 2 PRs:
- the PR that implements the current version
of
the
KIP:
https://github.com/apache/kafka/pull/16332
- the POC PR that clarifies the
`TxnSendOption`:
https://github.com/apache/kafka/pull/16465

Bests,
Alieh

On Thu, Jun 27, 2024 at 12:42 AM Justine
Olshan
<jols...@confluent.io.invalid> wrote:

Hey Alieh,

I think I am a little confused. Are the 3
points
above
addressed
by
the
KIP
or did something change? The PR seems to
not
include
this
change
and
still
has the CommitOption as well.

Thanks,
Justine

On Wed, Jun 26, 2024 at 2:15 PM Alieh
Saeedi
<asae...@confluent.io.invalid

wrote:

Hi all,


Looking at the PR <
https://github.com/apache/kafka/pull/16332

corresponding to the KIP, there are some
points
worthy
of
mention:


1) clearing the error ends up dirty/messy
code
in
`TransactionManager`.

2) By clearing the error, we are actually
doing
an
illegal
transition
from
`ABORTABLE_ERROR` to `IN_TRANSACTION`
which
is
conceptually
not
acceptable.
This can be the root cause of some issues,
with
perhaps
further
future
changes by others.

3) If the poison pill record `r1` causes a
transition
to
the
error
state
and then the next record `r2` requires
adding
a
partition
to
the
transaction, the action fails due to being
in
the
error
state.
In
this
case, clearing errors during
`commitTnx(CLEAR_SEND_ERROR)`
is
too
late.
However, this case can NOT be the main
concern
as
soon
as
KIP-890
is
fully
implemented.


My suggestion is to solve the problem
where
it
arises.
If
the
transition
to
the error state does not happen during
`send()`,
we
do
not
need
to
clear
the error later. Therefore, instead of
`CommitOption`,
we
can
define
a
`TxnSendOption` and prevent the `send()`
method
from
going
to
the
error
state in case 1) we're in a transaction
and
2)
the
user
asked
for
IGONRE_SEND_ERRORS. For more clarity, you
can
take a
look
at
the
POC
PR
<
https://github.com/apache/kafka/pull/16465
.

Cheers,
Alieh




























Reply via email to