I was also curious about this text:
The new method clears the latest error produced by `send(ProducerRecord)`
and transits the transaction back from the error state

I agree. We should not say "latest" but "any".



Is it fair to say that we expect to encounter only one send response with
an error that we clear on commitTransaction? Is that solving the problem?

Yes, I think that is an accurate description.


Speaking of documentation, is it confusing that the name of the KIP is no
longer consistent with the approach the KIP takes?

Sounds like an easy fix (we should not update the subject of the discuss email thread thought...)



looks like we said on the KIP that the options will be of the type  Map<String,
?> commitOptions. Would we want to define something more specific?

I would also suggest to either pass an enum (as preferred by Andrew) or a newly define class `CommitOptions`.

Something like:

    public enum CommitOptions {
        /**
* Cleans any previous send errors, before committing the transaction.
         * Note, if there are pending sends, error won't be cleared.
* To avoid pending sends, you need to call {@link #flush()} before committing the transaction.
         */
        CLEAR_SEND_ERRORS
    }


Or similar for a class:

    public class CommitOptions {
        /**
* Cleans any previous send errors, before committing the transaction.
         * Note, if there are pending sends, error won't be cleared.
* To avoid pending sends, you need to call {@link #flush()} before committing the transaction.
         */
        public static CommitOptions clearSendErrors();
    }

Plus:

    public class KafkaProducer {

        /**
         * <same JavaDocs as for commitTransaction()>
         *
* This method should only be called if there are no pending writes, ie, only after calling {@link #flush()). * If there are any errors sending messaged to topics, these errors can be cleared by passing {@link CommitOptions#clearSendErrors}, allowing the transaction to commit even in case of data loss during.
         *
         * If this method is used while there are pending sends,
         * send error cannot be cleared.
         */
        public void commitTransaction(CommitOptions options);
    }

Or something like this -- we can discuss details about JavaDocs on the PR IMHO.


What I
really don't like (in any of the options), is that we cannot really
document it in a way that articulates a value in the product.  There are
tons of nuances that require understanding some buggy behavior, then fixed
behavior, then an option to sometimes turn on buggy behavior, and etc.

I would believe that users don't need to understand the full history of events, and the propose JavaDocs should go a long way.



-Matthias


On 6/25/24 5:00 PM, Justine Olshan wrote:
Hey there,

I had a few questions about the update.
Looks like we said on the KIP that the options will be of the type  Map<String,
?> commitOptions. Would we want to define something more specific?

I was also curious about this text:
The new method clears the latest error produced by `send(ProducerRecord)`
and transits the transaction back from the error state

Is it fair to say that we expect to encounter only one send response with
an error that we clear on commitTransaction? Is that solving the problem?

I think we also need to be really careful about the documentation of this
method. It should be clear that setting this option will not do anything if
there is any inflight record when the method is called.
Speaking of documentation, is it confusing that the name of the KIP is no
longer consistent with the approach the KIP takes?

Thanks,
Justine

On Tue, Jun 25, 2024 at 5:08 AM Alieh Saeedi <asae...@confluent.io.invalid>
wrote:

Hi all,

Appreciation for maintaining the momentum of our discussion.


I see kinda consensus over the main points. It seems that we agreed on the
following:

1) Define the `commitTnx(commitOptions)` to clear the error.

2) Make the user explicitly call  `flush()` before
`commitTnx(commitOptions)`, if he determines ignoring errors.


I updated the KIP with the above-mentioned points. Please take a look. I am
sure it is not perfect yet, and there are/will be some open questions, but
if you agree, I will open voting as well. Of course, the discussion can
still carry on in this thread.


Cheers,

Alieh

On Tue, Jun 25, 2024 at 11:36 AM Chris Egerton <fearthecel...@gmail.com>
wrote:

Hi Artem,

Yes, I completely agree that by default, special action shouldn't be
required from users to prevent transactions from being committed when one
or more records can't be sent. The behavior I was suggesting was only
relevant to the new API we're discussing where we allow users to
intentionally bypass that logic when invoking commitTransaction.

Cheers,

Chris

On Tue, Jun 25, 2024, 01:44 Artem Livshits <alivsh...@confluent.io
.invalid>
wrote:

Hey folks,

Great discussion!

Re: throwing exceptions from send().  send() is documented to throw
KafkaException, so if the application doesn't handle it, it should be a
bug.  Now, it does have a note that API exceptions wouldn't be thrown,
not
sure if we have code that relies on that.  There is a reason exceptions
have classes, they are designed to express a "class of errors" that can
be
handled, so that we don't have to add a flag or a new method every time
we
have a new exception to throw.  But if there is consensus that it's
still
too risky (especially if we have examples of code that gets broken),
then I
agree that we shouldn't do it.

Re: various ways to communicate semantics change.  If we must have 2
different behaviors, I think passing options to "ignore errors" to
commitTransaction is probably the most intuitive way to do it.  What I
really don't like (in any of the options), is that we cannot really
document it in a way that articulates a value in the product.  There
are
tons of nuances that require understanding some buggy behavior, then
fixed
behavior, then an option to sometimes turn on buggy behavior, and etc.

  if a user invokes Producer::abortTransaction from within a producer
callback today

I think we would get invalid state exception.  Which we could probably
fix,
but even if we supported it, I think it would be good if doing send +
commit would lead to aborted transaction without special action from
the
application -- the simple things should be really simple, any failure
during send or commit should abort send + commit sequence without
special
handling.

-Artem

On Mon, Jun 24, 2024 at 6:37 PM Chris Egerton <fearthecel...@gmail.com

wrote:

One quick thought: if a user invokes Producer::abortTransaction from
within
a producer callback today, even in the midst of an ongoing call to
Producer::commitTransaction, what is the behavior? Would it be
reasonable
to support this behavior as a way to allow error handling to take
place
during implicit flushes, via producer callback?

On Mon, Jun 24, 2024 at 9:21 PM Matthias J. Sax <mj...@apache.org>
wrote:

My point it, that it does not seem to be safe to allow users to
ignore
errors with an implicit flush, and I think it's better to only
allow
it
with (ie, after) an explicit flush().

My reasoning is, that users should make a decision to ignore errors
or
not, before calling `commitTx()`, but after inspecting all
potential
send errors. With an implicit flush, users need to "blindly" decide
to
ignore send errors, because there are pending sends and potential
errors
are not known yet, when calling `commitTx()`.



In the documentation of commitTransaction, we say if any send
throws
an
error, commitTransaction will too.

Yes. And I think we should keep it this way for an implicit flush.
With
an explicit flush, `commitTransaction()` cannot encounter any send
errors any longer.



It says that all callbacks will be executed, but we ignore the
errors
of
the callbacks.

Ah. Thanks for pointing this out. For this case it's even worse
(for
case (2)), because the user cannot inspect any errors and make any
decision to ignore or not during an implicit flush...



We shouldn't be relying on errors in the callback unless we are
calling flush, which we can still do. It seems this has always
been
the
case as well.

Yes, has always been this way, and my point is to keep it this way
(option (2) would change it), and not start to allow to ignore
errors
with an implicit flush.



-Matthias



On 6/24/24 4:57 PM, Justine Olshan wrote:
Transaction verification is a concept from KIP-890 referring to
the
verification that a partition has been added to the transaction.
It's
not a
huge deal, but maybe we don't want to overload the terminology.

For option 2, I was a little confused by this

    when commitTx is called, there is still pending Futures and
not
all Callbacks are executed yet -- with the implicit flush, we
know
that
all Callbacks are executed, but even for this case, the user
could
only
throw an exception inside the Callback to stop the TX to
eventually
commit -- Futures cannot be used to make a decision to ignore
error
and
commit or not.

In the documentation of commitTransaction, we say if any send
throws
an
error, commitTransaction will too.

*Further, if any of the {@link #send(ProducerRecord)} calls which
were
part
of the transaction hit irrecoverable errors, this method will
throw
the
last received exception immediately and the transaction will not
be
committed.*

It says that all callbacks will be executed, but we ignore the
errors
of
the callbacks.

*If the transaction is committed successfully and this method
returns
without throwing an exception, it is guaranteed that all {@link
Callback
callbacks} for records in the transaction will have been invoked
and
completed. Note that exceptions thrown by callbacks are ignored;
the
producer proceeds to commit the transaction in any case.*

Is it fair to say though that for the send errors, we can choose
to
ignore
them? II wasn't understanding where the callbacks come in with
your
comment. We shouldn't be relying on errors in the callback unless
we
are
calling flush, which we can still do. It seems this has always
been
the
case as well.

Justine

On Mon, Jun 24, 2024 at 11:07 AM Andrew Schofield <
andrew_schofi...@live.com>
wrote:

Agreed. Options 1 and 3 are safe. Option 2 is not. I’d be happy
with
3a
as
the way.

I suggest “TRANSACTION VERIFIED”.

There isn’t really precedent for options in the producer API. We
could
use
an enum,
which is easy to use and not very future-proof. Or we could use
a
class
like the
admin API does, which is cumbersome and flexible.

    CommitTransactionOptions.TRANSACTION_VERIFIED

or

    public class CommitTransactionOptions {
      public CommitTransactionOptions();

      CommitTransactionOptions transactionVerified(boolean
transactionVerified);

      boolean transactionVerified();
    }


Then 3b is:

     send(…)
     send(…)
     flush()
     commitTransaction(new
CommitTransactionOptions().transactionVerified(true))


I’d tend towards the enum here because I doubt we need as much
flexibility
as the admin API requires.

Thanks,
Andrew


On 24 Jun 2024, at 18:39, Matthias J. Sax <mj...@apache.org>
wrote:

I am ok either way (ie, flush or commit), but I think we need
to
define
exact semantics, and I think there is some subtle thing to
consider:



1) flush(Options)

Example:

   send(...)
   send(...)

   flush(ignoreErrors)

   // at this point, we know that all Futures are completed and
all
Callbacks are executed, and we can assume that all user code
checking
for
errors did execute, before `commitTx` is called

   // I consider this option as safe

   commitTx()


2) commitTx(Option)

Example:

   send(...)
   send(...)

   // when commitTx is called, there is still pending Futures
and
not
all
Callbacks are executed yet -- with the implicit flush, we know
that
all
Callbacks are executed, but even for this case, the user could
only
throw
an exception inside the Callback to stop the TX to eventually
commit
--
Futures cannot be used to make a decision to ignore error and
commit
or
not.

   // I consider this option not as safe

   commitTx(igrnoreErrors)



3a) required flush + commitTx(Option)

Example:

   send(...)
   send(...)

   flush()

   // at this point, we know that all Future are completed and
all
Callbacks are executed, and we can assume that all user code
checking
for
error did execute, before `commitTx` is called

   // I consider this option as safe

   commitTx(ignoreErrors)


3b) missing flush + commitTx(Option)

Example:

   send(...)
   send(...)

   // as flush() was not called explicitly, we should ignore
`ignoreErrors` flag and always throw an exception if the
producer
is
in
error state, because we cannot be sure that the user did all
required
check
for error handling

   commitTx(ignoreErrors)



The only issue with option (3) is, that it's more complex and
semantics
are more subtle. But it might be the a good (necessary?) bridge
between
(1)
and (2): (3) is semantically sound (we ignore errors via
passing a
flag
into commitTx() instead of flush()), and at the same time safe
(we
force
users to explicitly flush() and [hopefully] do proper error
handling,
and
don't rely in am implicit flush() during commitTx() which might
be
error
prone).

(Also need to find a good and descriptive name for the flag we
pass
into
`commitTx()` for this case.)


-Matthias



On 6/24/24 8:51 AM, Andrew Schofield wrote:
Hi Chris,
That works for me too. I slightly prefer an option on flush(),
but
what
you suggested
works too.
Thanks,
Andrew
On 24 Jun 2024, at 15:14, Chris Egerton
<chr...@aiven.io.INVALID

wrote:

Hi Andrew,

I like a lot of what you said, but I still believe it's
better
to
override
commitTransaction than flush. Users will already have to
manually
opt
in to
ignoring errors encountered during transactions, and we can
document
recommended usage (i.e., explicitly invoking flush() before
invoking
commitTransaction(ignoreRecordErrors)) in the
newly-introduced
method.
I
don't believe it's worth the increased cognitive load on
users
with
non-transactional producers to introduce an overloaded
flush()
variant.

Cheers,

Chris

On Mon, Jun 24, 2024 at 9:39 AM Andrew Schofield <
andrew_schofi...@live.com>
wrote:

Hi Alieh,
Thanks for driving this. Unfortunately, there are many parts
of
the
API
which
are a bit unfortunate and it’s tricky to make small
improvements
that
don’t have
downsides.

I don’t like the idea of using a configuration because
configuration
is
often
outside the application and changing the behaviour of
someone
else’s
application
without understanding it is risky. Anything which embeds a
transactional
producer
could have its behaviour changed unexpectedly.

It would be been much nicer if send() didn’t fail silently
and
change
the
transaction
state. But, because it’s an asynchronous operation, I don’t
really
think
we can
just make it throw all exceptions, even though I really
think
that
`send()` is the
method with the problem here.

The contract of `flush()` is that it makes sure that all
preceding
sends
will have
completed, so it should be true that a well written
application
would
be
able to
know which records were OK because of the
Future<RecordMetadata>
returned
by the `send()` method. It should be able to determine
whether
it
wants to
commit
the transaction even if some of the intended operations
didn’t
succeed.

What we don’t currently have is a way for the application to
say
to
the
KafkaProducer
that it knows the outcome of sending the records and to
confirm
that
it
wants to proceed.
Then it would not be necessary for `commitTransaction()` to
throw
an
exception to
report a historical error which the application might choose
to
ignore.

Having read the comments, I think the KIP is on the right
lines
focusing
on the `flush()`
method. My suggestion is that we introduce an option on
`flush()`
to
be
used before
`commitTransaction()` for applications that want to be able
to
commit
transactions which
had known failed operations.

The code would be:

    producer.beginTransaction();

    future1 = producer.send(goodRecord1);
    future2 = producer.send(badRecord); // The future from
this
call
will
complete exceptionally
    future3 = producer.send(goodRecord2);

    producer.flush(FlushOption.TRANSACTION_READY);

    // At this point, we know that all 3 futures are complete
and
the
transaction contains 2 records
    producer.commitTransaction();

I wouldn’t deprecate `flush()` with no option. It just uses
the
default
option which behaves
like today.

Why did I suggest an option on `flush()` rather than
`commitTransaction()`? Because with
`flush()`, it’s clear when the application is stating that
it’s
seen
all
of the results from its
`send()` calls and it’s ready to proceed. If it has to rely
on
flushing
that occurs inside
`commitTransaction()`, I don’t see it’s as clear-cut.

Thanks,
Andrew



On 24 Jun 2024, at 13:44, Alieh Saeedi
<asae...@confluent.io.INVALID

wrote:

Hi all,
Thanks for the interesting discussion.

I assume that now the main questions are as follows:

1. Do we need to transit the transcation to the error state
for
API
exceptions?
2. Should we throw the API exception in `send()` instead of
returning a
future error?
3. If the answer to question (1) is NO and to question (2)
is
YES,
do we
need to change the current `flush` or `commitTnx` at all?

Cheers,
Alieh

On Sat, Jun 22, 2024 at 3:21 AM Matthias J. Sax <
mj...@apache.org>
wrote:

Hey Kirk,

can you elaborate on a few points?

Otherwise users would have to know to explicitly change
their
code
to
invoke flush().

Why? If we would add an option to `flush(FlushOption)`,
the
existing
`flush()` w/o any option will still be there, right? If we
would
really
deprecate existing `flush()`, it would just mean that we
would
pass
"default FlushOption" into an implicit flush (and yes, we
would
need to
define what this would be).

I think there is no clear winner (as pointed out in my
last
reply),
and
both `flush(FlushOption)` and `commitTx(CommitOption)` has
advantages
and drawbacks. Guess we need to just agree on which
tradeoff
we
want to
move forward with?


Not sure if your database example is a 1:1 fit? I think,
the
better
comparison would be:

BEGIN TX;
INSERT INTO foo VALUES (’a’);
INSERT INTO foo VALUES (’b’);
INSERT INTO foo VALUES (’c’);
INSERT INTO foo VALUES (’not sure’);

For this case, the full TX would roll back, right? I still
think
that
allowing users to just skip over the last error, and
continue
the
TX
would be ok. In the end, we provide a programmatic API,
and
not
a
declarative one as SQL. Of course, default behavior would
still
be
to
put the producer into error state, and the user would need
to
call
`abortTransaction()` to move forward.


-Matthias

On 6/21/24 5:26 PM, Kirk True wrote:
Hi Matthias,

On Jun 21, 2024, at 12:28 PM, Matthias J. Sax <
mj...@apache.org

wrote:

If we want to limit it to `RecordTooLargeException`
throwing
from
`send()` directly make sense. Thanks for calling it out.

It's still a question of backward compatibility?
`send()`
does
throw
exceptions already, including generic `KafkaException`.
Not
sure
if
this
helps with backward compatibility? Could we just add a new
exception
type
(which is a child of `KafkaException`)?

The Producer JavaDocs are not totally explicit about it
IMHO.

I think we could expect that some generic error handling
path
gets
executed. For the TX-case, I would assume that a TX would
be
aborted if
`send()` throws or that the producer would be `closed()`.
Overall
this
might be safe?

It would be a little less flexible
though, since (as you note) it would still be
impossible
to
commit
transactions after errors have been reported from
brokers.

KS would still need a way to clear the error state of
the
producer. We
could catch a `RecordTooLargeException` from `send()`,
call
the
handler
and
let it decide what to do next. But if it does return
`CONTINUE`
to
swallow
the error and drop the poison pill record on the floor, we
would
want to
move forward and commit the transaction.

But the question is: if we cannot add a record to the
tx,
does
the
producer need to go into error state? In the end, we did
throw
and
inform
the app that the record was _not_ added, and it's up to
the
app
to
decide
what to do next?

That’s an excellent question…

Imagine the user’s application is writing information to
a
database
instead of Kafka. If there’s a table with a CHAR(1) column
and
this
SQL
statement was attempted, what should happen?

     INSERT INTO foo VALUES (’not sure’);

Yes, that DML would fail, sure, but would the user expect
that
the
connection used by database library would get stuck in
some
kind
of
error
state? A user would be able catch the error and either
continue
or
abort,
based on their business rules.

So I agree with what I believe you’re implying: we
shouldn’t
poison the
Producer/TransactionManager on certain types of
application-level
errors in
send().

Kirk

If we report the error only via the `Callback` it's a
different
story,
because the contract for this case is clearly specified on
the
JavaDocs:

When used as part of a transaction, it is not necessary
to
define a
callback or check the result of the future
in order to detect errors from <code>send</code>. If
any
of
the
send
calls failed with an irrecoverable error,
the final {@link #commitTransaction()} call will fail
and
throw
the
exception from the last failed send. When
this happens, your application should call {@link
#abortTransaction()}
to reset the state and continue to send
data.



-Matthias


On 6/21/24 11:42 AM, Chris Egerton wrote:
Hi Artem,
I think it'd make sense to throw directly from send
whenever
possible,
instead of returning an already-completed future. I
didn't
do
that in
my
bug fix to try to be conservative about breaking
changes
but
this
seems to
have caused its own set of headaches. It would be a
little
less
flexible
though, since (as you note) it would still be
impossible
to
commit
transactions after errors have been reported from
brokers.
I'll leave it up to the Kafka Streams folks to decide
if
that
flexibility
is required. If it is, then users could explicitly call
flush()
before
committing (and ignoring errors for) or aborting a
transaction,
if
they
want to implement fine-grained error handling logic
such
as
allowing
errors
for a subset of topics to be ignored.
Hi Matthias,
Most of the time you're right and we can't throw from
send();
however,
in
this case (client-side record-too-large exception), the
error
is
actually
noticed by the producer before send() returns, so it
should
be
possible to
throw directly.
Cheers,
Chris
On Fri, Jun 21, 2024, 14:25 Matthias J. Sax <
mj...@apache.org>
wrote:
Not sure if we can change send and make it throw,
given
that
send()
is
async? That is why users can register a `Callback` to
begin
with,
right?

And Alieh's point about backward compatibility is
also a
fair
concern.


Actually, this would potentially be even
worse than the original buggy behavior because the
bug
was
that we
ignored
errors that happened in the "send()" method itself,
not
necessarily
the
ones that we got from the broker.

My understanding was that `commitTx(swallowError)`
would
only
swallow
`send()` errors, not errors about the actually
commit. I
agree
that
it
would be very bad to swallow errors about the actual
tx
commit...

It's a fair question if this might be too subtle; to
make
it
explicit,
we could use `CommitOpions#ignorePendingSendErors()`
[working
name]
to
make it clear.


If we think it's too subtle to change commit to
swallow
send()
errors,
maybe going with `flush(FlushOptions)` would be
clearer
(and
we
can
use
`FlushOption#swallowSendErrorsForTransactions()`
[working
name]
to
be
explicitly that the `FlushOption` for now has only an
effect
for
TX).


Thoughts?


-Matthias



On 6/21/24 4:10 AM, Alieh Saeedi wrote:
Hi all,


It is very exciting to see all the experts here
raising
very
good
points.

As we go further, we see more and more options to
improve
our
solution,
which makes concluding and updating the KIP
impossible.


The main suggestions so far are:

1. `flush` with `flushOptions` as input parameter

2. `commitTx` with `commitOptions` as input parameter

3. `send` must throw the exception


My concern about the 3rd suggestion:

1. Does the change cause any issue with backward
compatibility?

2. The `send (bad record)` already transits the
transaction
to
the
error
state. No user, including Streams is able to transit
the
transaction
back
from the error state. Do you mean we remove the
`maybeTransitionToErrorState(e)` from here
<








https://github.com/apache/kafka/blob/9b5b434e2a6b2d5290ea403fc02859b1c523d8aa/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1112

as well?

Cheers,
Alieh


On Fri, Jun 21, 2024 at 8:45 AM Andrew Schofield <
andrew_schofi...@live.com>
wrote:

Hi Artem,
I think you make a good point which is worth further
consideration.
If
any of the existing methods is really ripe for a
change
here,
it’s
the
send() that actually caused the problem. If that can
be
fixed
so
there
are
no situations in which a lurking error breaks a
transaction,
that
might
be
the best.

Thanks,
Andrew

On 21 Jun 2024, at 01:51, Artem Livshits <
alivsh...@confluent.io
.INVALID>
wrote:

I thought we still wait for requests (and their
errors)
to
come
in and
could handle fatal errors appropriately.

We do wait for requests, but my understanding is
that
when
commitTransaction("ignore send errors") we want to
ignore
errors.
So
if
we
do

1. send
2. commitTransaction("ignore send errors")

the commit will succeed.  You can look at the
example
in
https://issues.apache.org/jira/browse/KAFKA-9279
and
just
substitute
commitTransaction with commitTransaction("ignore
send
errors")
and
we
get
the buggy behavior back :-).  Actually, this would
potentially be
even
worse than the original buggy behavior because the
bug
was
that
we
ignored
errors that happened in the "send()" method itself,
not
necessarily the
ones that we got from the broker.

Actually, looking at
https://github.com/apache/kafka/pull/11508/files,
wouldn't a better solution be to just throw the
error
from
the
"send"
method itself, rather than trying to set it to be
thrown
during
commit?
This way the example in
https://issues.apache.org/jira/browse/KAFKA-9279
would be fixed, and at the same time it would give
an
opportunity
for
KS
to
catch the error and ignore it if needed.  Not sure
if
we
need a
KIP for
that, just do a better fix of the old bug.

-Artem

On Thu, Jun 20, 2024 at 4:58 PM Justine Olshan
<jols...@confluent.io.invalid>
wrote:

I'm a bit late to the party, but the discussion
here
looks
reasonable.
Moving the logic to a transactional method makes
sense
to
me and
makes
me
feel a bit better about keeping the complexity in
the
methods
relevant
to
the issue.

One minor concern is that if we set "ignore send
errors" (or whatever we decide to name it) option
without
explicit
flush,
it'll actually lead to broken behavior as the
application
won't
be
able
to
stop a commit from proceeding even on fatal
errors.

Is this with respect to the case a request is
still
inflight
when
we
call
commitTransaction? I thought we still wait for
requests
(and
their
errors)
to come in and could handle fatal errors
appropriately.

Justine

On Thu, Jun 20, 2024 at 4:32 PM Artem Livshits
<alivsh...@confluent.io.invalid> wrote:

Hi Matthias (and other folks who suggested
ideas),

maybe `commitTransaction(CommitOptions)` or
similar
could
be a
good
way
forward?

I like this approach.  One minor concern is that
if
we
set
"ignore
send
errors" (or whatever we decide to name it) option
without
explicit
flush,
it'll actually lead to broken behavior as the
application
won't
be
able
to
stop a commit from proceeding even on fatal
errors.
But
I
guess
we'll
just
have to clearly document it.

In some way we are basically adding a flag to
optionally
restore
the
https://issues.apache.org/jira/browse/KAFKA-9279
bug,
which is
the
motivation for all these changes, anyway :-).

-Artem


On Thu, Jun 20, 2024 at 2:18 PM Matthias J. Sax <
mj...@apache.org>
wrote:

Seems the option to use a config does not get a
lot
of
support.

So we need to go with some form or "overload /
new
method". I
think
Chris' point about not coupling it to `flush()`
but
rather
`commitTransaction()` is actually a very good
one;
for
non-tx
case,
the
different flush variants would not make sense.

I also like Lianet's idea to pass in some
"options"
object, so
maybe
`commitTransaction(CommitOptions)` or similar
could
be a
good
way
forward? It's much better than a `boolean`
parameter,
aesthetically,
as
we as extendable in the future if necessary.

Given that we would pass in an optional
parameter,
we
might
not
even
need to deprecate the existing
`commitTransaction()`
method?



-Matthias

On 6/20/24 9:12 AM, Andrew Schofield wrote:
Hi Alieh,
Thanks for the KIP.

I *really* don’t like adding a config which
changes
the
behaviour
of
the
flush() method. We already have too many
configs.
But I
totally
understand
the problem that you’re trying to solve and
some
of
the
other
suggestions
in this thread seem neater.

Personally, I would add another method to
KafkaProducer.
Not
an
overload
on flush() because this is not flush() at all.
Using
Matthias’s
options,
I prefer (3).

Thanks,
Andrew

On 20 Jun 2024, at 15:08, Lianet M. <
liane...@gmail.com

wrote:

Hi all, thanks for the KIP Alieh!

LM1. Totally agree with Artem's point about
the
config
not
being
the
most
explicit/flexible way to express this
capability.
Getting
then to
Matthias
4 options, what I don't like about 3 and 4 is
that
it
seems
they
might
not
age very well? Aren't we going to be wanting
some
other
twist
to
the
flush
semantics that will have us adding yet another
param
to
it,
or
another
overloaded method? I truly don't have the
context
to
answer
that,
but
if it
feels like a realistic future maybe adding
some
kind
FlushOptions
params to
the flush would be better from an
extensibility
point
of
view. It
would
only have the clearErrors option available for
now
but
could
accept
any
other we may need. I find that this would
remove
the
"ugliness"
Matthias
pointed out for 3. and 4.

LM2. No matter how we end up expressing the
different
semantics
for
flush,
let's make sure we update the KIP on the flush
and
commitTransaction
java
docs. It currently states that  flush "clears
the
last
exception"
and
commitTransaction "will NOT throw" if called
after
flush,
but
it
really
all
depends on the config/options/method used.

LM3. I find it would be helpful to include an
example
to
show
the
new
flow
that we're unblocking (I see this as the great
gain
here):
flush
with
clear
error option enabled -> catch and do whatever
error
handling
we
want
->
commitTransaction successfully

Thanks!

Lianet

On Wed, Jun 19, 2024 at 11:26 PM Chris
Egerton <
fearthecel...@gmail.com

wrote:

Hi Matthias,

I like the alternatives you've listed. One
more
that
might
help
is
if,
instead of overloading flush(), we overloaded
commitTransaction()
to
something like commitTransaction(boolean
tolerateRecordErrors).
This
seems
slightly cleaner in that it takes the
behavioral
change we
want,
which
only
applies to transactional producers, to an API
method
that
is
only
used
for
transactional producers. It would also avoid
the
issue
of
whether
or
not
flush() (or a new variant of it with altered
semantics)
should
throw
or
not. Thoughts?

Hi Alieh,

Thanks for the KIP, I like this direction a
lot
more
than
the
pluggable
handler!

I share Artem's concerns that enabling this
behavior
via
configuration
doesn't seem like a great fit. It's likely
that
application
code
will
be
written in a style that only works with one
type
of
behavior
from
transactional producers, so requiring that
application
code
to
declare
its
expectations for the behavior of its producer
seems
more
appropriate
than,
e.g., allowing users deploying that
application
to
tweak a
configuration
file that gets fed to producers spun up
inside
it.

Cheers,

Chris

On Wed, Jun 19, 2024 at 10:32 PM Matthias J.
Sax
<
mj...@apache.org

wrote:

Thanks for the KIP Alieh. I actually like
the
KIP
as-is,
but
think
Arthem raises very good points...

Seems we have four options on how to move
forward?

   1. add config to allow "silent error
clearance"
as
the
KIP
proposes
   2. change flush() to clear error and let
it
throw
   3. add new flushAndThrow()` (or better
name)
which
clears
error
and
throws
   4. add `flush(boolean clearAndThrow)` and
let
user
pick
(and
deprecate
existing `flush()`)

For (2), given that it would be a behavior
change,
we
might
also
need
a
public "feature flag" config.

It seems, both (1) and (2) have the issue
Artem
mentioned.
(3)
and
(4)
would be safer to this end, however, for
both
we
kinda get
an
ugly
API?

Not sure right now if I have any preference.
Seems
we
need
to
pick
some
evil and that there is no clear best
solution?
Would
be
good to
her
from
others what they think


-Matthias


On 6/18/24 8:39 PM, Artem Livshits wrote:
Hi Alieh,

Thank you for the KIP.  I have a couple of
suggestions:

AL1.  We should throw an error from flush
after
we
clear
it.
This
would
make it so that both "send + commit" and
"send
+
flush +
commit"
(the
latter looks like just a more verbose way
to
express
the
former,
and
it
would be intuitive if it behaves the same)
would
throw if
the
transaction
has an error (so if the code is written
either
way
it's
going
be
correct).
At the same time, the latter could be
extended
by
the
caller to
intercept
exceptions from flush, ignore as needed,
and
commit
the
transaction.
This
solution would keep basic things simple (if
someone
has
code
that
doesn't
require advanced error handling, then basic
"send +
flush +
commit"
would
do the right thing) and advanced things
possible,
an
application
can
add
try + catch around flush and ignore some
errors.

AL2.  I'm not sure if config is the best
way
to
express
the
modification
of
the "flush" semantics -- the application
logic
that
calls
"flush"
needs
to
match the "flush" semantics and configuring
semantics in
a
detached
place
creates a room for bugs due to
discrepancies.
This
can
be
especially
bad
if the producer loads configuration from a
file
at
run
time, in
that
case a
mistake in configuration could break the
application
because it
was
written
to expect one "flush" semantics but the
semantics
is
switched.
Given
that
the "flush" semantics needs to match the
caller's
expectation,
a
way
to
accomplish that would be to pass the
caller's
expectation
to
the
"flush"
call by either have a method with a
different
name
or
have
an
overload
with
a Boolen flag that would configure the
semantics
(the
current
method
could
just redirect to the new one).

-Artem

On Mon, Jun 17, 2024 at 9:09 AM Alieh
Saeedi
<asae...@confluent.io.invalid>
wrote:

Hi all,

I'd like to kick off a discussion for
KIP-1059
that
suggests
adding
a
new
feature to the Producer flush() method.















<

https://cwiki.apache.org/confluence/display/KAFKA/KIP-1059%3A+Enable+the+Producer+flush%28%29+method+to+clear+the+latest+send%28%29+error




Reply via email to