Agreed. Options 1 and 3 are safe. Option 2 is not. I’d be happy with 3a as the 
way.

I suggest “TRANSACTION VERIFIED”.

There isn’t really precedent for options in the producer API. We could use an 
enum,
which is easy to use and not very future-proof. Or we could use a class like the
admin API does, which is cumbersome and flexible.

  CommitTransactionOptions.TRANSACTION_VERIFIED

or

  public class CommitTransactionOptions {
    public CommitTransactionOptions();

    CommitTransactionOptions transactionVerified(boolean transactionVerified);

    boolean transactionVerified();
  }


Then 3b is:

   send(…)
   send(…)
   flush()
   commitTransaction(new CommitTransactionOptions().transactionVerified(true))


I’d tend towards the enum here because I doubt we need as much flexibility
as the admin API requires.

Thanks,
Andrew


> On 24 Jun 2024, at 18:39, Matthias J. Sax <mj...@apache.org> wrote:
>
> I am ok either way (ie, flush or commit), but I think we need to define exact 
> semantics, and I think there is some subtle thing to consider:
>
>
>
> 1) flush(Options)
>
> Example:
>
>  send(...)
>  send(...)
>
>  flush(ignoreErrors)
>
>  // at this point, we know that all Futures are completed and all Callbacks 
> are executed, and we can assume that all user code checking for errors did 
> execute, before `commitTx` is called
>
>  // I consider this option as safe
>
>  commitTx()
>
>
> 2) commitTx(Option)
>
> Example:
>
>  send(...)
>  send(...)
>
>  // when commitTx is called, there is still pending Futures and not all 
> Callbacks are executed yet -- with the implicit flush, we know that all 
> Callbacks are executed, but even for this case, the user could only throw an 
> exception inside the Callback to stop the TX to eventually commit -- Futures 
> cannot be used to make a decision to ignore error and commit or not.
>
>  // I consider this option not as safe
>
>  commitTx(igrnoreErrors)
>
>
>
> 3a) required flush + commitTx(Option)
>
> Example:
>
>  send(...)
>  send(...)
>
>  flush()
>
>  // at this point, we know that all Future are completed and all Callbacks 
> are executed, and we can assume that all user code checking for error did 
> execute, before `commitTx` is called
>
>  // I consider this option as safe
>
>  commitTx(ignoreErrors)
>
>
> 3b) missing flush + commitTx(Option)
>
> Example:
>
>  send(...)
>  send(...)
>
>  // as flush() was not called explicitly, we should ignore `ignoreErrors` 
> flag and always throw an exception if the producer is in error state, because 
> we cannot be sure that the user did all required check for error handling
>
>  commitTx(ignoreErrors)
>
>
>
> The only issue with option (3) is, that it's more complex and semantics are 
> more subtle. But it might be the a good (necessary?) bridge between (1) and 
> (2): (3) is semantically sound (we ignore errors via passing a flag into 
> commitTx() instead of flush()), and at the same time safe (we force users to 
> explicitly flush() and [hopefully] do proper error handling, and don't rely 
> in am implicit flush() during commitTx() which might be error prone).
>
> (Also need to find a good and descriptive name for the flag we pass into 
> `commitTx()` for this case.)
>
>
> -Matthias
>
>
>
> On 6/24/24 8:51 AM, Andrew Schofield wrote:
>> Hi Chris,
>> That works for me too. I slightly prefer an option on flush(), but what you 
>> suggested
>> works too.
>> Thanks,
>> Andrew
>>> On 24 Jun 2024, at 15:14, Chris Egerton <chr...@aiven.io.INVALID> wrote:
>>>
>>> Hi Andrew,
>>>
>>> I like a lot of what you said, but I still believe it's better to override
>>> commitTransaction than flush. Users will already have to manually opt in to
>>> ignoring errors encountered during transactions, and we can document
>>> recommended usage (i.e., explicitly invoking flush() before invoking
>>> commitTransaction(ignoreRecordErrors)) in the newly-introduced method. I
>>> don't believe it's worth the increased cognitive load on users with
>>> non-transactional producers to introduce an overloaded flush() variant.
>>>
>>> Cheers,
>>>
>>> Chris
>>>
>>> On Mon, Jun 24, 2024 at 9:39 AM Andrew Schofield <andrew_schofi...@live.com>
>>> wrote:
>>>
>>>> Hi Alieh,
>>>> Thanks for driving this. Unfortunately, there are many parts of the API
>>>> which
>>>> are a bit unfortunate and it’s tricky to make small improvements that
>>>> don’t have
>>>> downsides.
>>>>
>>>> I don’t like the idea of using a configuration because configuration is
>>>> often
>>>> outside the application and changing the behaviour of someone else’s
>>>> application
>>>> without understanding it is risky. Anything which embeds a transactional
>>>> producer
>>>> could have its behaviour changed unexpectedly.
>>>>
>>>> It would be been much nicer if send() didn’t fail silently and change the
>>>> transaction
>>>> state. But, because it’s an asynchronous operation, I don’t really think
>>>> we can
>>>> just make it throw all exceptions, even though I really think that
>>>> `send()` is the
>>>> method with the problem here.
>>>>
>>>> The contract of `flush()` is that it makes sure that all preceding sends
>>>> will have
>>>> completed, so it should be true that a well written application would be
>>>> able to
>>>> know which records were OK because of the Future<RecordMetadata> returned
>>>> by the `send()` method. It should be able to determine whether it wants to
>>>> commit
>>>> the transaction even if some of the intended operations didn’t succeed.
>>>>
>>>> What we don’t currently have is a way for the application to say to the
>>>> KafkaProducer
>>>> that it knows the outcome of sending the records and to confirm that it
>>>> wants to proceed.
>>>> Then it would not be necessary for `commitTransaction()` to throw an
>>>> exception to
>>>> report a historical error which the application might choose to ignore.
>>>>
>>>> Having read the comments, I think the KIP is on the right lines focusing
>>>> on the `flush()`
>>>> method. My suggestion is that we introduce an option on `flush()` to be
>>>> used before
>>>> `commitTransaction()` for applications that want to be able to commit
>>>> transactions which
>>>> had known failed operations.
>>>>
>>>> The code would be:
>>>>
>>>>   producer.beginTransaction();
>>>>
>>>>   future1 = producer.send(goodRecord1);
>>>>   future2 = producer.send(badRecord); // The future from this call will
>>>> complete exceptionally
>>>>   future3 = producer.send(goodRecord2);
>>>>
>>>>   producer.flush(FlushOption.TRANSACTION_READY);
>>>>
>>>>   // At this point, we know that all 3 futures are complete and the
>>>> transaction contains 2 records
>>>>   producer.commitTransaction();
>>>>
>>>> I wouldn’t deprecate `flush()` with no option. It just uses the default
>>>> option which behaves
>>>> like today.
>>>>
>>>> Why did I suggest an option on `flush()` rather than
>>>> `commitTransaction()`? Because with
>>>> `flush()`, it’s clear when the application is stating that it’s seen all
>>>> of the results from its
>>>> `send()` calls and it’s ready to proceed. If it has to rely on flushing
>>>> that occurs inside
>>>> `commitTransaction()`, I don’t see it’s as clear-cut.
>>>>
>>>> Thanks,
>>>> Andrew
>>>>
>>>>
>>>>
>>>>> On 24 Jun 2024, at 13:44, Alieh Saeedi <asae...@confluent.io.INVALID>
>>>> wrote:
>>>>>
>>>>> Hi all,
>>>>> Thanks for the interesting discussion.
>>>>>
>>>>> I assume that now the main questions are as follows:
>>>>>
>>>>> 1. Do we need to transit the transcation to the error state for API
>>>>> exceptions?
>>>>> 2. Should we throw the API exception in `send()` instead of returning a
>>>>> future error?
>>>>> 3. If the answer to question (1) is NO and to question (2) is YES, do we
>>>>> need to change the current `flush` or `commitTnx` at all?
>>>>>
>>>>> Cheers,
>>>>> Alieh
>>>>>
>>>>> On Sat, Jun 22, 2024 at 3:21 AM Matthias J. Sax <mj...@apache.org>
>>>> wrote:
>>>>>
>>>>>> Hey Kirk,
>>>>>>
>>>>>> can you elaborate on a few points?
>>>>>>
>>>>>>> Otherwise users would have to know to explicitly change their code to
>>>>>> invoke flush().
>>>>>>
>>>>>> Why? If we would add an option to `flush(FlushOption)`, the existing
>>>>>> `flush()` w/o any option will still be there, right? If we would really
>>>>>> deprecate existing `flush()`, it would just mean that we would pass
>>>>>> "default FlushOption" into an implicit flush (and yes, we would need to
>>>>>> define what this would be).
>>>>>>
>>>>>> I think there is no clear winner (as pointed out in my last reply), and
>>>>>> both `flush(FlushOption)` and `commitTx(CommitOption)` has advantages
>>>>>> and drawbacks. Guess we need to just agree on which tradeoff we want to
>>>>>> move forward with?
>>>>>>
>>>>>>
>>>>>> Not sure if your database example is a 1:1 fit? I think, the better
>>>>>> comparison would be:
>>>>>>
>>>>>> BEGIN TX;
>>>>>> INSERT INTO foo VALUES (’a’);
>>>>>> INSERT INTO foo VALUES (’b’);
>>>>>> INSERT INTO foo VALUES (’c’);
>>>>>> INSERT INTO foo VALUES (’not sure’);
>>>>>>
>>>>>> For this case, the full TX would roll back, right? I still think that
>>>>>> allowing users to just skip over the last error, and continue the TX
>>>>>> would be ok. In the end, we provide a programmatic API, and not a
>>>>>> declarative one as SQL. Of course, default behavior would still be to
>>>>>> put the producer into error state, and the user would need to call
>>>>>> `abortTransaction()` to move forward.
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 6/21/24 5:26 PM, Kirk True wrote:
>>>>>>> Hi Matthias,
>>>>>>>
>>>>>>>> On Jun 21, 2024, at 12:28 PM, Matthias J. Sax <mj...@apache.org>
>>>> wrote:
>>>>>>>>
>>>>>>>> If we want to limit it to `RecordTooLargeException` throwing from
>>>>>> `send()` directly make sense. Thanks for calling it out.
>>>>>>>>
>>>>>>>> It's still a question of backward compatibility? `send()` does throw
>>>>>> exceptions already, including generic `KafkaException`. Not sure if this
>>>>>> helps with backward compatibility? Could we just add a new exception
>>>> type
>>>>>> (which is a child of `KafkaException`)?
>>>>>>>>
>>>>>>>> The Producer JavaDocs are not totally explicit about it IMHO.
>>>>>>>>
>>>>>>>> I think we could expect that some generic error handling path gets
>>>>>> executed. For the TX-case, I would assume that a TX would be aborted if
>>>>>> `send()` throws or that the producer would be `closed()`. Overall this
>>>>>> might be safe?
>>>>>>>>
>>>>>>>>> It would be a little less flexible
>>>>>>>>>> though, since (as you note) it would still be impossible to commit
>>>>>>>>>> transactions after errors have been reported from brokers.
>>>>>>>>
>>>>>>>> KS would still need a way to clear the error state of the producer. We
>>>>>> could catch a `RecordTooLargeException` from `send()`, call the handler
>>>> and
>>>>>> let it decide what to do next. But if it does return `CONTINUE` to
>>>> swallow
>>>>>> the error and drop the poison pill record on the floor, we would want to
>>>>>> move forward and commit the transaction.
>>>>>>>>
>>>>>>>> But the question is: if we cannot add a record to the tx, does the
>>>>>> producer need to go into error state? In the end, we did throw and
>>>> inform
>>>>>> the app that the record was _not_ added, and it's up to the app to
>>>> decide
>>>>>> what to do next?
>>>>>>>
>>>>>>> That’s an excellent question…
>>>>>>>
>>>>>>> Imagine the user’s application is writing information to a database
>>>>>> instead of Kafka. If there’s a table with a CHAR(1) column and this SQL
>>>>>> statement was attempted, what should happen?
>>>>>>>
>>>>>>>    INSERT INTO foo VALUES (’not sure’);
>>>>>>>
>>>>>>> Yes, that DML would fail, sure, but would the user expect that the
>>>>>> connection used by database library would get stuck in some kind of
>>>> error
>>>>>> state? A user would be able catch the error and either continue or
>>>> abort,
>>>>>> based on their business rules.
>>>>>>>
>>>>>>> So I agree with what I believe you’re implying: we shouldn’t poison the
>>>>>> Producer/TransactionManager on certain types of application-level
>>>> errors in
>>>>>> send().
>>>>>>>
>>>>>>> Kirk
>>>>>>>
>>>>>>>> If we report the error only via the `Callback` it's a different story,
>>>>>> because the contract for this case is clearly specified on the JavaDocs:
>>>>>>>>
>>>>>>>>> When used as part of a transaction, it is not necessary to define a
>>>>>> callback or check the result of the future
>>>>>>>>> in order to detect errors from <code>send</code>. If any of the send
>>>>>> calls failed with an irrecoverable error,
>>>>>>>>> the final {@link #commitTransaction()} call will fail and throw the
>>>>>> exception from the last failed send. When
>>>>>>>>> this happens, your application should call {@link
>>>> #abortTransaction()}
>>>>>> to reset the state and continue to send
>>>>>>>>> data.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>> On 6/21/24 11:42 AM, Chris Egerton wrote:
>>>>>>>>> Hi Artem,
>>>>>>>>> I think it'd make sense to throw directly from send whenever
>>>> possible,
>>>>>>>>> instead of returning an already-completed future. I didn't do that in
>>>>>> my
>>>>>>>>> bug fix to try to be conservative about breaking changes but this
>>>>>> seems to
>>>>>>>>> have caused its own set of headaches. It would be a little less
>>>>>> flexible
>>>>>>>>> though, since (as you note) it would still be impossible to commit
>>>>>>>>> transactions after errors have been reported from brokers.
>>>>>>>>> I'll leave it up to the Kafka Streams folks to decide if that
>>>>>> flexibility
>>>>>>>>> is required. If it is, then users could explicitly call flush()
>>>> before
>>>>>>>>> committing (and ignoring errors for) or aborting a transaction, if
>>>> they
>>>>>>>>> want to implement fine-grained error handling logic such as allowing
>>>>>> errors
>>>>>>>>> for a subset of topics to be ignored.
>>>>>>>>> Hi Matthias,
>>>>>>>>> Most of the time you're right and we can't throw from send();
>>>> however,
>>>>>> in
>>>>>>>>> this case (client-side record-too-large exception), the error is
>>>>>> actually
>>>>>>>>> noticed by the producer before send() returns, so it should be
>>>>>> possible to
>>>>>>>>> throw directly.
>>>>>>>>> Cheers,
>>>>>>>>> Chris
>>>>>>>>> On Fri, Jun 21, 2024, 14:25 Matthias J. Sax <mj...@apache.org>
>>>> wrote:
>>>>>>>>>> Not sure if we can change send and make it throw, given that send()
>>>> is
>>>>>>>>>> async? That is why users can register a `Callback` to begin with,
>>>>>> right?
>>>>>>>>>>
>>>>>>>>>> And Alieh's point about backward compatibility is also a fair
>>>> concern.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> Actually, this would potentially be even
>>>>>>>>>>> worse than the original buggy behavior because the bug was that we
>>>>>>>>>> ignored
>>>>>>>>>>> errors that happened in the "send()" method itself, not necessarily
>>>>>> the
>>>>>>>>>>> ones that we got from the broker.
>>>>>>>>>>
>>>>>>>>>> My understanding was that `commitTx(swallowError)` would only
>>>> swallow
>>>>>>>>>> `send()` errors, not errors about the actually commit. I agree that
>>>> it
>>>>>>>>>> would be very bad to swallow errors about the actual tx commit...
>>>>>>>>>>
>>>>>>>>>> It's a fair question if this might be too subtle; to make it
>>>> explicit,
>>>>>>>>>> we could use `CommitOpions#ignorePendingSendErors()` [working name]
>>>> to
>>>>>>>>>> make it clear.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> If we think it's too subtle to change commit to swallow send()
>>>> errors,
>>>>>>>>>> maybe going with `flush(FlushOptions)` would be clearer (and we can
>>>>>> use
>>>>>>>>>> `FlushOption#swallowSendErrorsForTransactions()` [working name] to
>>>> be
>>>>>>>>>> explicitly that the `FlushOption` for now has only an effect for
>>>> TX).
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thoughts?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 6/21/24 4:10 AM, Alieh Saeedi wrote:
>>>>>>>>>>> Hi all,
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> It is very exciting to see all the experts here raising very good
>>>>>> points.
>>>>>>>>>>>
>>>>>>>>>>> As we go further, we see more and more options to improve our
>>>>>> solution,
>>>>>>>>>>> which makes concluding and updating the KIP impossible.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> The main suggestions so far are:
>>>>>>>>>>>
>>>>>>>>>>> 1. `flush` with `flushOptions` as input parameter
>>>>>>>>>>>
>>>>>>>>>>> 2. `commitTx` with `commitOptions` as input parameter
>>>>>>>>>>>
>>>>>>>>>>> 3. `send` must throw the exception
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> My concern about the 3rd suggestion:
>>>>>>>>>>>
>>>>>>>>>>> 1. Does the change cause any issue with backward compatibility?
>>>>>>>>>>>
>>>>>>>>>>> 2. The `send (bad record)` already transits the transaction to the
>>>>>> error
>>>>>>>>>>> state. No user, including Streams is able to transit the
>>>> transaction
>>>>>> back
>>>>>>>>>>> from the error state. Do you mean we remove the
>>>>>>>>>>> `maybeTransitionToErrorState(e)` from here
>>>>>>>>>>> <
>>>>>>>>>>
>>>>>>
>>>> https://github.com/apache/kafka/blob/9b5b434e2a6b2d5290ea403fc02859b1c523d8aa/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1112
>>>>>>>>>>>
>>>>>>>>>>> as well?
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Alieh
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jun 21, 2024 at 8:45 AM Andrew Schofield <
>>>>>>>>>> andrew_schofi...@live.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Artem,
>>>>>>>>>>>> I think you make a good point which is worth further
>>>> consideration.
>>>>>> If
>>>>>>>>>>>> any of the existing methods is really ripe for a change here, it’s
>>>>>> the
>>>>>>>>>>>> send() that actually caused the problem. If that can be fixed so
>>>>>> there
>>>>>>>>>> are
>>>>>>>>>>>> no situations in which a lurking error breaks a transaction, that
>>>>>> might
>>>>>>>>>> be
>>>>>>>>>>>> the best.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Andrew
>>>>>>>>>>>>
>>>>>>>>>>>>> On 21 Jun 2024, at 01:51, Artem Livshits <alivsh...@confluent.io
>>>>>>>>>> .INVALID>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I thought we still wait for requests (and their errors) to come
>>>>>> in and
>>>>>>>>>>>>> could handle fatal errors appropriately.
>>>>>>>>>>>>>
>>>>>>>>>>>>> We do wait for requests, but my understanding is that when
>>>>>>>>>>>>> commitTransaction("ignore send errors") we want to ignore errors.
>>>>>> So
>>>>>>>>>> if
>>>>>>>>>>>> we
>>>>>>>>>>>>> do
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1. send
>>>>>>>>>>>>> 2. commitTransaction("ignore send errors")
>>>>>>>>>>>>>
>>>>>>>>>>>>> the commit will succeed.  You can look at the example in
>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 and just
>>>>>> substitute
>>>>>>>>>>>>> commitTransaction with commitTransaction("ignore send errors")
>>>> and
>>>>>> we
>>>>>>>>>> get
>>>>>>>>>>>>> the buggy behavior back :-).  Actually, this would potentially be
>>>>>> even
>>>>>>>>>>>>> worse than the original buggy behavior because the bug was that
>>>> we
>>>>>>>>>>>> ignored
>>>>>>>>>>>>> errors that happened in the "send()" method itself, not
>>>>>> necessarily the
>>>>>>>>>>>>> ones that we got from the broker.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Actually, looking at
>>>>>> https://github.com/apache/kafka/pull/11508/files,
>>>>>>>>>>>>> wouldn't a better solution be to just throw the error from the
>>>>>> "send"
>>>>>>>>>>>>> method itself, rather than trying to set it to be thrown during
>>>>>> commit?
>>>>>>>>>>>>> This way the example in
>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279
>>>>>>>>>>>>> would be fixed, and at the same time it would give an opportunity
>>>>>> for
>>>>>>>>>> KS
>>>>>>>>>>>> to
>>>>>>>>>>>>> catch the error and ignore it if needed.  Not sure if we need a
>>>>>> KIP for
>>>>>>>>>>>>> that, just do a better fix of the old bug.
>>>>>>>>>>>>>
>>>>>>>>>>>>> -Artem
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 4:58 PM Justine Olshan
>>>>>>>>>>>> <jols...@confluent.io.invalid>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm a bit late to the party, but the discussion here looks
>>>>>> reasonable.
>>>>>>>>>>>>>> Moving the logic to a transactional method makes sense to me and
>>>>>> makes
>>>>>>>>>>>> me
>>>>>>>>>>>>>> feel a bit better about keeping the complexity in the methods
>>>>>> relevant
>>>>>>>>>>>> to
>>>>>>>>>>>>>> the issue.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> One minor concern is that if we set "ignore send
>>>>>>>>>>>>>> errors" (or whatever we decide to name it) option without
>>>> explicit
>>>>>>>>>>>> flush,
>>>>>>>>>>>>>> it'll actually lead to broken behavior as the application won't
>>>> be
>>>>>>>>>> able
>>>>>>>>>>>> to
>>>>>>>>>>>>>> stop a commit from proceeding even on fatal errors.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Is this with respect to the case a request is still inflight
>>>> when
>>>>>> we
>>>>>>>>>>>> call
>>>>>>>>>>>>>> commitTransaction? I thought we still wait for requests (and
>>>> their
>>>>>>>>>>>> errors)
>>>>>>>>>>>>>> to come in and could handle fatal errors appropriately.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Justine
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 4:32 PM Artem Livshits
>>>>>>>>>>>>>> <alivsh...@confluent.io.invalid> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Matthias (and other folks who suggested ideas),
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> maybe `commitTransaction(CommitOptions)` or similar could be a
>>>>>> good
>>>>>>>>>>>>>> way
>>>>>>>>>>>>>>> forward?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I like this approach.  One minor concern is that if we set
>>>>>> "ignore
>>>>>>>>>> send
>>>>>>>>>>>>>>> errors" (or whatever we decide to name it) option without
>>>>>> explicit
>>>>>>>>>>>> flush,
>>>>>>>>>>>>>>> it'll actually lead to broken behavior as the application won't
>>>>>> be
>>>>>>>>>> able
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> stop a commit from proceeding even on fatal errors.  But I
>>>> guess
>>>>>>>>>> we'll
>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>> have to clearly document it.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In some way we are basically adding a flag to optionally
>>>> restore
>>>>>> the
>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 bug, which is
>>>>>> the
>>>>>>>>>>>>>>> motivation for all these changes, anyway :-).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -Artem
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 2:18 PM Matthias J. Sax <
>>>>>> mj...@apache.org>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Seems the option to use a config does not get a lot of
>>>> support.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> So we need to go with some form or "overload / new method". I
>>>>>> think
>>>>>>>>>>>>>>>> Chris' point about not coupling it to `flush()` but rather
>>>>>>>>>>>>>>>> `commitTransaction()` is actually a very good one; for non-tx
>>>>>> case,
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> different flush variants would not make sense.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I also like Lianet's idea to pass in some "options" object, so
>>>>>> maybe
>>>>>>>>>>>>>>>> `commitTransaction(CommitOptions)` or similar could be a good
>>>>>> way
>>>>>>>>>>>>>>>> forward? It's much better than a `boolean` parameter,
>>>>>> aesthetically,
>>>>>>>>>>>> as
>>>>>>>>>>>>>>>> we as extendable in the future if necessary.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Given that we would pass in an optional parameter, we might
>>>> not
>>>>>> even
>>>>>>>>>>>>>>>> need to deprecate the existing `commitTransaction()` method?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On 6/20/24 9:12 AM, Andrew Schofield wrote:
>>>>>>>>>>>>>>>>> Hi Alieh,
>>>>>>>>>>>>>>>>> Thanks for the KIP.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I *really* don’t like adding a config which changes the
>>>>>> behaviour
>>>>>>>>>> of
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> flush() method. We already have too many configs. But I
>>>> totally
>>>>>>>>>>>>>>>> understand
>>>>>>>>>>>>>>>>> the problem that you’re trying to solve and some of the other
>>>>>>>>>>>>>>> suggestions
>>>>>>>>>>>>>>>>> in this thread seem neater.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Personally, I would add another method to KafkaProducer. Not
>>>> an
>>>>>>>>>>>>>>> overload
>>>>>>>>>>>>>>>>> on flush() because this is not flush() at all. Using
>>>> Matthias’s
>>>>>>>>>>>>>>> options,
>>>>>>>>>>>>>>>>> I prefer (3).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Andrew
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On 20 Jun 2024, at 15:08, Lianet M. <liane...@gmail.com>
>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi all, thanks for the KIP Alieh!
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> LM1. Totally agree with Artem's point about the config not
>>>>>> being
>>>>>>>>>> the
>>>>>>>>>>>>>>>> most
>>>>>>>>>>>>>>>>>> explicit/flexible way to express this capability. Getting
>>>>>> then to
>>>>>>>>>>>>>>>> Matthias
>>>>>>>>>>>>>>>>>> 4 options, what I don't like about 3 and 4 is that it seems
>>>>>> they
>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>> age very well? Aren't we going to be wanting some other
>>>> twist
>>>>>> to
>>>>>>>>>> the
>>>>>>>>>>>>>>>> flush
>>>>>>>>>>>>>>>>>> semantics that will have us adding yet another param to it,
>>>> or
>>>>>>>>>>>>>> another
>>>>>>>>>>>>>>>>>> overloaded method? I truly don't have the context to answer
>>>>>> that,
>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>> if it
>>>>>>>>>>>>>>>>>> feels like a realistic future maybe adding some kind
>>>>>> FlushOptions
>>>>>>>>>>>>>>>> params to
>>>>>>>>>>>>>>>>>> the flush would be better from an extensibility point of
>>>>>> view. It
>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>> only have the clearErrors option available for now but could
>>>>>>>>>> accept
>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>>> other we may need. I find that this would remove the
>>>>>> "ugliness"
>>>>>>>>>>>>>>> Matthias
>>>>>>>>>>>>>>>>>> pointed out for 3. and 4.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> LM2. No matter how we end up expressing the different
>>>>>> semantics
>>>>>>>>>> for
>>>>>>>>>>>>>>>> flush,
>>>>>>>>>>>>>>>>>> let's make sure we update the KIP on the flush and
>>>>>>>>>> commitTransaction
>>>>>>>>>>>>>>>> java
>>>>>>>>>>>>>>>>>> docs. It currently states that  flush "clears the last
>>>>>> exception"
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> commitTransaction "will NOT throw" if called after flush,
>>>> but
>>>>>> it
>>>>>>>>>>>>>>> really
>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>> depends on the config/options/method used.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> LM3. I find it would be helpful to include an example to
>>>> show
>>>>>> the
>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>> flow
>>>>>>>>>>>>>>>>>> that we're unblocking (I see this as the great gain here):
>>>>>> flush
>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>> clear
>>>>>>>>>>>>>>>>>> error option enabled -> catch and do whatever error handling
>>>>>> we
>>>>>>>>>> want
>>>>>>>>>>>>>>> ->
>>>>>>>>>>>>>>>>>> commitTransaction successfully
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Lianet
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 11:26 PM Chris Egerton <
>>>>>>>>>>>>>>> fearthecel...@gmail.com
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I like the alternatives you've listed. One more that might
>>>>>> help
>>>>>>>>>> is
>>>>>>>>>>>>>>> if,
>>>>>>>>>>>>>>>>>>> instead of overloading flush(), we overloaded
>>>>>> commitTransaction()
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> something like commitTransaction(boolean
>>>>>> tolerateRecordErrors).
>>>>>>>>>>>>>> This
>>>>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>>>>>> slightly cleaner in that it takes the behavioral change we
>>>>>> want,
>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>>>> applies to transactional producers, to an API method that
>>>> is
>>>>>> only
>>>>>>>>>>>>>>> used
>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>> transactional producers. It would also avoid the issue of
>>>>>> whether
>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>> flush() (or a new variant of it with altered semantics)
>>>>>> should
>>>>>>>>>>>>>> throw
>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>> not. Thoughts?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Alieh,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks for the KIP, I like this direction a lot more than
>>>> the
>>>>>>>>>>>>>>> pluggable
>>>>>>>>>>>>>>>>>>> handler!
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I share Artem's concerns that enabling this behavior via
>>>>>>>>>>>>>>> configuration
>>>>>>>>>>>>>>>>>>> doesn't seem like a great fit. It's likely that application
>>>>>> code
>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>> written in a style that only works with one type of
>>>> behavior
>>>>>> from
>>>>>>>>>>>>>>>>>>> transactional producers, so requiring that application code
>>>>>> to
>>>>>>>>>>>>>>> declare
>>>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>>>>>> expectations for the behavior of its producer seems more
>>>>>>>>>>>>>> appropriate
>>>>>>>>>>>>>>>> than,
>>>>>>>>>>>>>>>>>>> e.g., allowing users deploying that application to tweak a
>>>>>>>>>>>>>>>> configuration
>>>>>>>>>>>>>>>>>>> file that gets fed to producers spun up inside it.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Chris
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 10:32 PM Matthias J. Sax <
>>>>>>>>>> mj...@apache.org
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks for the KIP Alieh. I actually like the KIP as-is,
>>>> but
>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>> Arthem raises very good points...
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Seems we have four options on how to move forward?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>  1. add config to allow "silent error clearance" as the
>>>>>> KIP
>>>>>>>>>>>>>>> proposes
>>>>>>>>>>>>>>>>>>>>  2. change flush() to clear error and let it throw
>>>>>>>>>>>>>>>>>>>>  3. add new flushAndThrow()` (or better name) which
>>>> clears
>>>>>>>>>> error
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> throws
>>>>>>>>>>>>>>>>>>>>  4. add `flush(boolean clearAndThrow)` and let user pick
>>>>>> (and
>>>>>>>>>>>>>>>> deprecate
>>>>>>>>>>>>>>>>>>>> existing `flush()`)
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> For (2), given that it would be a behavior change, we
>>>> might
>>>>>> also
>>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>> public "feature flag" config.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> It seems, both (1) and (2) have the issue Artem mentioned.
>>>>>> (3)
>>>>>>>>>> and
>>>>>>>>>>>>>>> (4)
>>>>>>>>>>>>>>>>>>>> would be safer to this end, however, for both we kinda get
>>>>>> an
>>>>>>>>>> ugly
>>>>>>>>>>>>>>>> API?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Not sure right now if I have any preference. Seems we need
>>>>>> to
>>>>>>>>>> pick
>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>> evil and that there is no clear best solution? Would be
>>>>>> good to
>>>>>>>>>>>>>> her
>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>> others what they think
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On 6/18/24 8:39 PM, Artem Livshits wrote:
>>>>>>>>>>>>>>>>>>>>> Hi Alieh,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thank you for the KIP.  I have a couple of suggestions:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> AL1.  We should throw an error from flush after we clear
>>>>>> it.
>>>>>>>>>>>>>> This
>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>> make it so that both "send + commit" and "send + flush +
>>>>>>>>>> commit"
>>>>>>>>>>>>>>> (the
>>>>>>>>>>>>>>>>>>>>> latter looks like just a more verbose way to express the
>>>>>>>>>> former,
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>> would be intuitive if it behaves the same) would throw if
>>>>>> the
>>>>>>>>>>>>>>>>>>> transaction
>>>>>>>>>>>>>>>>>>>>> has an error (so if the code is written either way it's
>>>>>> going
>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>> correct).
>>>>>>>>>>>>>>>>>>>>> At the same time, the latter could be extended by the
>>>>>> caller to
>>>>>>>>>>>>>>>>>>> intercept
>>>>>>>>>>>>>>>>>>>>> exceptions from flush, ignore as needed, and commit the
>>>>>>>>>>>>>>> transaction.
>>>>>>>>>>>>>>>>>>>> This
>>>>>>>>>>>>>>>>>>>>> solution would keep basic things simple (if someone has
>>>>>> code
>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> doesn't
>>>>>>>>>>>>>>>>>>>>> require advanced error handling, then basic "send +
>>>> flush +
>>>>>>>>>>>>>> commit"
>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>> do the right thing) and advanced things possible, an
>>>>>>>>>> application
>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>> add
>>>>>>>>>>>>>>>>>>>>> try + catch around flush and ignore some errors.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> AL2.  I'm not sure if config is the best way to express
>>>> the
>>>>>>>>>>>>>>>>>>> modification
>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>> the "flush" semantics -- the application logic that calls
>>>>>>>>>> "flush"
>>>>>>>>>>>>>>>> needs
>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> match the "flush" semantics and configuring semantics in
>>>> a
>>>>>>>>>>>>>> detached
>>>>>>>>>>>>>>>>>>> place
>>>>>>>>>>>>>>>>>>>>> creates a room for bugs due to discrepancies.  This can
>>>> be
>>>>>>>>>>>>>>> especially
>>>>>>>>>>>>>>>>>>> bad
>>>>>>>>>>>>>>>>>>>>> if the producer loads configuration from a file at run
>>>>>> time, in
>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>> case a
>>>>>>>>>>>>>>>>>>>>> mistake in configuration could break the application
>>>>>> because it
>>>>>>>>>>>>>> was
>>>>>>>>>>>>>>>>>>>> written
>>>>>>>>>>>>>>>>>>>>> to expect one "flush" semantics but the semantics is
>>>>>> switched.
>>>>>>>>>>>>>>> Given
>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>> the "flush" semantics needs to match the caller's
>>>>>> expectation,
>>>>>>>>>> a
>>>>>>>>>>>>>>> way
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> accomplish that would be to pass the caller's expectation
>>>>>> to
>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> "flush"
>>>>>>>>>>>>>>>>>>>>> call by either have a method with a different name or
>>>> have
>>>>>> an
>>>>>>>>>>>>>>>> overload
>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>> a Boolen flag that would configure the semantics (the
>>>>>> current
>>>>>>>>>>>>>>> method
>>>>>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>>> just redirect to the new one).
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> -Artem
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Mon, Jun 17, 2024 at 9:09 AM Alieh Saeedi
>>>>>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid>
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I'd like to kick off a discussion for KIP-1059 that
>>>>>> suggests
>>>>>>>>>>>>>>> adding
>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>>>> feature to the Producer flush() method.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1059%3A+Enable+the+Producer+flush%28%29+method+to+clear+the+latest+send%28%29+error
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>> Alieh


Reply via email to