I am ok either way (ie, flush or commit), but I think we need to define
exact semantics, and I think there is some subtle thing to consider:
1) flush(Options)
Example:
send(...)
send(...)
flush(ignoreErrors)
// at this point, we know that all Futures are completed and all
Callbacks are executed, and we can assume that all user code checking
for errors did execute, before `commitTx` is called
// I consider this option as safe
commitTx()
2) commitTx(Option)
Example:
send(...)
send(...)
// when commitTx is called, there is still pending Futures and not
all Callbacks are executed yet -- with the implicit flush, we know that
all Callbacks are executed, but even for this case, the user could only
throw an exception inside the Callback to stop the TX to eventually
commit -- Futures cannot be used to make a decision to ignore error and
commit or not.
// I consider this option not as safe
commitTx(igrnoreErrors)
3a) required flush + commitTx(Option)
Example:
send(...)
send(...)
flush()
// at this point, we know that all Future are completed and all
Callbacks are executed, and we can assume that all user code checking
for error did execute, before `commitTx` is called
// I consider this option as safe
commitTx(ignoreErrors)
3b) missing flush + commitTx(Option)
Example:
send(...)
send(...)
// as flush() was not called explicitly, we should ignore
`ignoreErrors` flag and always throw an exception if the producer is in
error state, because we cannot be sure that the user did all required
check for error handling
commitTx(ignoreErrors)
The only issue with option (3) is, that it's more complex and semantics
are more subtle. But it might be the a good (necessary?) bridge between
(1) and (2): (3) is semantically sound (we ignore errors via passing a
flag into commitTx() instead of flush()), and at the same time safe (we
force users to explicitly flush() and [hopefully] do proper error
handling, and don't rely in am implicit flush() during commitTx() which
might be error prone).
(Also need to find a good and descriptive name for the flag we pass into
`commitTx()` for this case.)
-Matthias
On 6/24/24 8:51 AM, Andrew Schofield wrote:
Hi Chris,
That works for me too. I slightly prefer an option on flush(), but what you
suggested
works too.
Thanks,
Andrew
On 24 Jun 2024, at 15:14, Chris Egerton <chr...@aiven.io.INVALID> wrote:
Hi Andrew,
I like a lot of what you said, but I still believe it's better to override
commitTransaction than flush. Users will already have to manually opt in to
ignoring errors encountered during transactions, and we can document
recommended usage (i.e., explicitly invoking flush() before invoking
commitTransaction(ignoreRecordErrors)) in the newly-introduced method. I
don't believe it's worth the increased cognitive load on users with
non-transactional producers to introduce an overloaded flush() variant.
Cheers,
Chris
On Mon, Jun 24, 2024 at 9:39 AM Andrew Schofield <andrew_schofi...@live.com>
wrote:
Hi Alieh,
Thanks for driving this. Unfortunately, there are many parts of the API
which
are a bit unfortunate and it’s tricky to make small improvements that
don’t have
downsides.
I don’t like the idea of using a configuration because configuration is
often
outside the application and changing the behaviour of someone else’s
application
without understanding it is risky. Anything which embeds a transactional
producer
could have its behaviour changed unexpectedly.
It would be been much nicer if send() didn’t fail silently and change the
transaction
state. But, because it’s an asynchronous operation, I don’t really think
we can
just make it throw all exceptions, even though I really think that
`send()` is the
method with the problem here.
The contract of `flush()` is that it makes sure that all preceding sends
will have
completed, so it should be true that a well written application would be
able to
know which records were OK because of the Future<RecordMetadata> returned
by the `send()` method. It should be able to determine whether it wants to
commit
the transaction even if some of the intended operations didn’t succeed.
What we don’t currently have is a way for the application to say to the
KafkaProducer
that it knows the outcome of sending the records and to confirm that it
wants to proceed.
Then it would not be necessary for `commitTransaction()` to throw an
exception to
report a historical error which the application might choose to ignore.
Having read the comments, I think the KIP is on the right lines focusing
on the `flush()`
method. My suggestion is that we introduce an option on `flush()` to be
used before
`commitTransaction()` for applications that want to be able to commit
transactions which
had known failed operations.
The code would be:
producer.beginTransaction();
future1 = producer.send(goodRecord1);
future2 = producer.send(badRecord); // The future from this call will
complete exceptionally
future3 = producer.send(goodRecord2);
producer.flush(FlushOption.TRANSACTION_READY);
// At this point, we know that all 3 futures are complete and the
transaction contains 2 records
producer.commitTransaction();
I wouldn’t deprecate `flush()` with no option. It just uses the default
option which behaves
like today.
Why did I suggest an option on `flush()` rather than
`commitTransaction()`? Because with
`flush()`, it’s clear when the application is stating that it’s seen all
of the results from its
`send()` calls and it’s ready to proceed. If it has to rely on flushing
that occurs inside
`commitTransaction()`, I don’t see it’s as clear-cut.
Thanks,
Andrew
On 24 Jun 2024, at 13:44, Alieh Saeedi <asae...@confluent.io.INVALID>
wrote:
Hi all,
Thanks for the interesting discussion.
I assume that now the main questions are as follows:
1. Do we need to transit the transcation to the error state for API
exceptions?
2. Should we throw the API exception in `send()` instead of returning a
future error?
3. If the answer to question (1) is NO and to question (2) is YES, do we
need to change the current `flush` or `commitTnx` at all?
Cheers,
Alieh
On Sat, Jun 22, 2024 at 3:21 AM Matthias J. Sax <mj...@apache.org>
wrote:
Hey Kirk,
can you elaborate on a few points?
Otherwise users would have to know to explicitly change their code to
invoke flush().
Why? If we would add an option to `flush(FlushOption)`, the existing
`flush()` w/o any option will still be there, right? If we would really
deprecate existing `flush()`, it would just mean that we would pass
"default FlushOption" into an implicit flush (and yes, we would need to
define what this would be).
I think there is no clear winner (as pointed out in my last reply), and
both `flush(FlushOption)` and `commitTx(CommitOption)` has advantages
and drawbacks. Guess we need to just agree on which tradeoff we want to
move forward with?
Not sure if your database example is a 1:1 fit? I think, the better
comparison would be:
BEGIN TX;
INSERT INTO foo VALUES (’a’);
INSERT INTO foo VALUES (’b’);
INSERT INTO foo VALUES (’c’);
INSERT INTO foo VALUES (’not sure’);
For this case, the full TX would roll back, right? I still think that
allowing users to just skip over the last error, and continue the TX
would be ok. In the end, we provide a programmatic API, and not a
declarative one as SQL. Of course, default behavior would still be to
put the producer into error state, and the user would need to call
`abortTransaction()` to move forward.
-Matthias
On 6/21/24 5:26 PM, Kirk True wrote:
Hi Matthias,
On Jun 21, 2024, at 12:28 PM, Matthias J. Sax <mj...@apache.org>
wrote:
If we want to limit it to `RecordTooLargeException` throwing from
`send()` directly make sense. Thanks for calling it out.
It's still a question of backward compatibility? `send()` does throw
exceptions already, including generic `KafkaException`. Not sure if this
helps with backward compatibility? Could we just add a new exception
type
(which is a child of `KafkaException`)?
The Producer JavaDocs are not totally explicit about it IMHO.
I think we could expect that some generic error handling path gets
executed. For the TX-case, I would assume that a TX would be aborted if
`send()` throws or that the producer would be `closed()`. Overall this
might be safe?
It would be a little less flexible
though, since (as you note) it would still be impossible to commit
transactions after errors have been reported from brokers.
KS would still need a way to clear the error state of the producer. We
could catch a `RecordTooLargeException` from `send()`, call the handler
and
let it decide what to do next. But if it does return `CONTINUE` to
swallow
the error and drop the poison pill record on the floor, we would want to
move forward and commit the transaction.
But the question is: if we cannot add a record to the tx, does the
producer need to go into error state? In the end, we did throw and
inform
the app that the record was _not_ added, and it's up to the app to
decide
what to do next?
That’s an excellent question…
Imagine the user’s application is writing information to a database
instead of Kafka. If there’s a table with a CHAR(1) column and this SQL
statement was attempted, what should happen?
INSERT INTO foo VALUES (’not sure’);
Yes, that DML would fail, sure, but would the user expect that the
connection used by database library would get stuck in some kind of
error
state? A user would be able catch the error and either continue or
abort,
based on their business rules.
So I agree with what I believe you’re implying: we shouldn’t poison the
Producer/TransactionManager on certain types of application-level
errors in
send().
Kirk
If we report the error only via the `Callback` it's a different story,
because the contract for this case is clearly specified on the JavaDocs:
When used as part of a transaction, it is not necessary to define a
callback or check the result of the future
in order to detect errors from <code>send</code>. If any of the send
calls failed with an irrecoverable error,
the final {@link #commitTransaction()} call will fail and throw the
exception from the last failed send. When
this happens, your application should call {@link
#abortTransaction()}
to reset the state and continue to send
data.
-Matthias
On 6/21/24 11:42 AM, Chris Egerton wrote:
Hi Artem,
I think it'd make sense to throw directly from send whenever
possible,
instead of returning an already-completed future. I didn't do that in
my
bug fix to try to be conservative about breaking changes but this
seems to
have caused its own set of headaches. It would be a little less
flexible
though, since (as you note) it would still be impossible to commit
transactions after errors have been reported from brokers.
I'll leave it up to the Kafka Streams folks to decide if that
flexibility
is required. If it is, then users could explicitly call flush()
before
committing (and ignoring errors for) or aborting a transaction, if
they
want to implement fine-grained error handling logic such as allowing
errors
for a subset of topics to be ignored.
Hi Matthias,
Most of the time you're right and we can't throw from send();
however,
in
this case (client-side record-too-large exception), the error is
actually
noticed by the producer before send() returns, so it should be
possible to
throw directly.
Cheers,
Chris
On Fri, Jun 21, 2024, 14:25 Matthias J. Sax <mj...@apache.org>
wrote:
Not sure if we can change send and make it throw, given that send()
is
async? That is why users can register a `Callback` to begin with,
right?
And Alieh's point about backward compatibility is also a fair
concern.
Actually, this would potentially be even
worse than the original buggy behavior because the bug was that we
ignored
errors that happened in the "send()" method itself, not necessarily
the
ones that we got from the broker.
My understanding was that `commitTx(swallowError)` would only
swallow
`send()` errors, not errors about the actually commit. I agree that
it
would be very bad to swallow errors about the actual tx commit...
It's a fair question if this might be too subtle; to make it
explicit,
we could use `CommitOpions#ignorePendingSendErors()` [working name]
to
make it clear.
If we think it's too subtle to change commit to swallow send()
errors,
maybe going with `flush(FlushOptions)` would be clearer (and we can
use
`FlushOption#swallowSendErrorsForTransactions()` [working name] to
be
explicitly that the `FlushOption` for now has only an effect for
TX).
Thoughts?
-Matthias
On 6/21/24 4:10 AM, Alieh Saeedi wrote:
Hi all,
It is very exciting to see all the experts here raising very good
points.
As we go further, we see more and more options to improve our
solution,
which makes concluding and updating the KIP impossible.
The main suggestions so far are:
1. `flush` with `flushOptions` as input parameter
2. `commitTx` with `commitOptions` as input parameter
3. `send` must throw the exception
My concern about the 3rd suggestion:
1. Does the change cause any issue with backward compatibility?
2. The `send (bad record)` already transits the transaction to the
error
state. No user, including Streams is able to transit the
transaction
back
from the error state. Do you mean we remove the
`maybeTransitionToErrorState(e)` from here
<
https://github.com/apache/kafka/blob/9b5b434e2a6b2d5290ea403fc02859b1c523d8aa/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1112
as well?
Cheers,
Alieh
On Fri, Jun 21, 2024 at 8:45 AM Andrew Schofield <
andrew_schofi...@live.com>
wrote:
Hi Artem,
I think you make a good point which is worth further
consideration.
If
any of the existing methods is really ripe for a change here, it’s
the
send() that actually caused the problem. If that can be fixed so
there
are
no situations in which a lurking error breaks a transaction, that
might
be
the best.
Thanks,
Andrew
On 21 Jun 2024, at 01:51, Artem Livshits <alivsh...@confluent.io
.INVALID>
wrote:
I thought we still wait for requests (and their errors) to come
in and
could handle fatal errors appropriately.
We do wait for requests, but my understanding is that when
commitTransaction("ignore send errors") we want to ignore errors.
So
if
we
do
1. send
2. commitTransaction("ignore send errors")
the commit will succeed. You can look at the example in
https://issues.apache.org/jira/browse/KAFKA-9279 and just
substitute
commitTransaction with commitTransaction("ignore send errors")
and
we
get
the buggy behavior back :-). Actually, this would potentially be
even
worse than the original buggy behavior because the bug was that
we
ignored
errors that happened in the "send()" method itself, not
necessarily the
ones that we got from the broker.
Actually, looking at
https://github.com/apache/kafka/pull/11508/files,
wouldn't a better solution be to just throw the error from the
"send"
method itself, rather than trying to set it to be thrown during
commit?
This way the example in
https://issues.apache.org/jira/browse/KAFKA-9279
would be fixed, and at the same time it would give an opportunity
for
KS
to
catch the error and ignore it if needed. Not sure if we need a
KIP for
that, just do a better fix of the old bug.
-Artem
On Thu, Jun 20, 2024 at 4:58 PM Justine Olshan
<jols...@confluent.io.invalid>
wrote:
I'm a bit late to the party, but the discussion here looks
reasonable.
Moving the logic to a transactional method makes sense to me and
makes
me
feel a bit better about keeping the complexity in the methods
relevant
to
the issue.
One minor concern is that if we set "ignore send
errors" (or whatever we decide to name it) option without
explicit
flush,
it'll actually lead to broken behavior as the application won't
be
able
to
stop a commit from proceeding even on fatal errors.
Is this with respect to the case a request is still inflight
when
we
call
commitTransaction? I thought we still wait for requests (and
their
errors)
to come in and could handle fatal errors appropriately.
Justine
On Thu, Jun 20, 2024 at 4:32 PM Artem Livshits
<alivsh...@confluent.io.invalid> wrote:
Hi Matthias (and other folks who suggested ideas),
maybe `commitTransaction(CommitOptions)` or similar could be a
good
way
forward?
I like this approach. One minor concern is that if we set
"ignore
send
errors" (or whatever we decide to name it) option without
explicit
flush,
it'll actually lead to broken behavior as the application won't
be
able
to
stop a commit from proceeding even on fatal errors. But I
guess
we'll
just
have to clearly document it.
In some way we are basically adding a flag to optionally
restore
the
https://issues.apache.org/jira/browse/KAFKA-9279 bug, which is
the
motivation for all these changes, anyway :-).
-Artem
On Thu, Jun 20, 2024 at 2:18 PM Matthias J. Sax <
mj...@apache.org>
wrote:
Seems the option to use a config does not get a lot of
support.
So we need to go with some form or "overload / new method". I
think
Chris' point about not coupling it to `flush()` but rather
`commitTransaction()` is actually a very good one; for non-tx
case,
the
different flush variants would not make sense.
I also like Lianet's idea to pass in some "options" object, so
maybe
`commitTransaction(CommitOptions)` or similar could be a good
way
forward? It's much better than a `boolean` parameter,
aesthetically,
as
we as extendable in the future if necessary.
Given that we would pass in an optional parameter, we might
not
even
need to deprecate the existing `commitTransaction()` method?
-Matthias
On 6/20/24 9:12 AM, Andrew Schofield wrote:
Hi Alieh,
Thanks for the KIP.
I *really* don’t like adding a config which changes the
behaviour
of
the
flush() method. We already have too many configs. But I
totally
understand
the problem that you’re trying to solve and some of the other
suggestions
in this thread seem neater.
Personally, I would add another method to KafkaProducer. Not
an
overload
on flush() because this is not flush() at all. Using
Matthias’s
options,
I prefer (3).
Thanks,
Andrew
On 20 Jun 2024, at 15:08, Lianet M. <liane...@gmail.com>
wrote:
Hi all, thanks for the KIP Alieh!
LM1. Totally agree with Artem's point about the config not
being
the
most
explicit/flexible way to express this capability. Getting
then to
Matthias
4 options, what I don't like about 3 and 4 is that it seems
they
might
not
age very well? Aren't we going to be wanting some other
twist
to
the
flush
semantics that will have us adding yet another param to it,
or
another
overloaded method? I truly don't have the context to answer
that,
but
if it
feels like a realistic future maybe adding some kind
FlushOptions
params to
the flush would be better from an extensibility point of
view. It
would
only have the clearErrors option available for now but could
accept
any
other we may need. I find that this would remove the
"ugliness"
Matthias
pointed out for 3. and 4.
LM2. No matter how we end up expressing the different
semantics
for
flush,
let's make sure we update the KIP on the flush and
commitTransaction
java
docs. It currently states that flush "clears the last
exception"
and
commitTransaction "will NOT throw" if called after flush,
but
it
really
all
depends on the config/options/method used.
LM3. I find it would be helpful to include an example to
show
the
new
flow
that we're unblocking (I see this as the great gain here):
flush
with
clear
error option enabled -> catch and do whatever error handling
we
want
->
commitTransaction successfully
Thanks!
Lianet
On Wed, Jun 19, 2024 at 11:26 PM Chris Egerton <
fearthecel...@gmail.com
wrote:
Hi Matthias,
I like the alternatives you've listed. One more that might
help
is
if,
instead of overloading flush(), we overloaded
commitTransaction()
to
something like commitTransaction(boolean
tolerateRecordErrors).
This
seems
slightly cleaner in that it takes the behavioral change we
want,
which
only
applies to transactional producers, to an API method that
is
only
used
for
transactional producers. It would also avoid the issue of
whether
or
not
flush() (or a new variant of it with altered semantics)
should
throw
or
not. Thoughts?
Hi Alieh,
Thanks for the KIP, I like this direction a lot more than
the
pluggable
handler!
I share Artem's concerns that enabling this behavior via
configuration
doesn't seem like a great fit. It's likely that application
code
will
be
written in a style that only works with one type of
behavior
from
transactional producers, so requiring that application code
to
declare
its
expectations for the behavior of its producer seems more
appropriate
than,
e.g., allowing users deploying that application to tweak a
configuration
file that gets fed to producers spun up inside it.
Cheers,
Chris
On Wed, Jun 19, 2024 at 10:32 PM Matthias J. Sax <
mj...@apache.org
wrote:
Thanks for the KIP Alieh. I actually like the KIP as-is,
but
think
Arthem raises very good points...
Seems we have four options on how to move forward?
1. add config to allow "silent error clearance" as the
KIP
proposes
2. change flush() to clear error and let it throw
3. add new flushAndThrow()` (or better name) which
clears
error
and
throws
4. add `flush(boolean clearAndThrow)` and let user pick
(and
deprecate
existing `flush()`)
For (2), given that it would be a behavior change, we
might
also
need
a
public "feature flag" config.
It seems, both (1) and (2) have the issue Artem mentioned.
(3)
and
(4)
would be safer to this end, however, for both we kinda get
an
ugly
API?
Not sure right now if I have any preference. Seems we need
to
pick
some
evil and that there is no clear best solution? Would be
good to
her
from
others what they think
-Matthias
On 6/18/24 8:39 PM, Artem Livshits wrote:
Hi Alieh,
Thank you for the KIP. I have a couple of suggestions:
AL1. We should throw an error from flush after we clear
it.
This
would
make it so that both "send + commit" and "send + flush +
commit"
(the
latter looks like just a more verbose way to express the
former,
and
it
would be intuitive if it behaves the same) would throw if
the
transaction
has an error (so if the code is written either way it's
going
be
correct).
At the same time, the latter could be extended by the
caller to
intercept
exceptions from flush, ignore as needed, and commit the
transaction.
This
solution would keep basic things simple (if someone has
code
that
doesn't
require advanced error handling, then basic "send +
flush +
commit"
would
do the right thing) and advanced things possible, an
application
can
add
try + catch around flush and ignore some errors.
AL2. I'm not sure if config is the best way to express
the
modification
of
the "flush" semantics -- the application logic that calls
"flush"
needs
to
match the "flush" semantics and configuring semantics in
a
detached
place
creates a room for bugs due to discrepancies. This can
be
especially
bad
if the producer loads configuration from a file at run
time, in
that
case a
mistake in configuration could break the application
because it
was
written
to expect one "flush" semantics but the semantics is
switched.
Given
that
the "flush" semantics needs to match the caller's
expectation,
a
way
to
accomplish that would be to pass the caller's expectation
to
the
"flush"
call by either have a method with a different name or
have
an
overload
with
a Boolen flag that would configure the semantics (the
current
method
could
just redirect to the new one).
-Artem
On Mon, Jun 17, 2024 at 9:09 AM Alieh Saeedi
<asae...@confluent.io.invalid>
wrote:
Hi all,
I'd like to kick off a discussion for KIP-1059 that
suggests
adding
a
new
feature to the Producer flush() method.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1059%3A+Enable+the+Producer+flush%28%29+method+to+clear+the+latest+send%28%29+error
Cheers,
Alieh