Hi Chris,
That works for me too. I slightly prefer an option on flush(), but what you 
suggested
works too.

Thanks,
Andrew

> On 24 Jun 2024, at 15:14, Chris Egerton <chr...@aiven.io.INVALID> wrote:
>
> Hi Andrew,
>
> I like a lot of what you said, but I still believe it's better to override
> commitTransaction than flush. Users will already have to manually opt in to
> ignoring errors encountered during transactions, and we can document
> recommended usage (i.e., explicitly invoking flush() before invoking
> commitTransaction(ignoreRecordErrors)) in the newly-introduced method. I
> don't believe it's worth the increased cognitive load on users with
> non-transactional producers to introduce an overloaded flush() variant.
>
> Cheers,
>
> Chris
>
> On Mon, Jun 24, 2024 at 9:39 AM Andrew Schofield <andrew_schofi...@live.com>
> wrote:
>
>> Hi Alieh,
>> Thanks for driving this. Unfortunately, there are many parts of the API
>> which
>> are a bit unfortunate and it’s tricky to make small improvements that
>> don’t have
>> downsides.
>>
>> I don’t like the idea of using a configuration because configuration is
>> often
>> outside the application and changing the behaviour of someone else’s
>> application
>> without understanding it is risky. Anything which embeds a transactional
>> producer
>> could have its behaviour changed unexpectedly.
>>
>> It would be been much nicer if send() didn’t fail silently and change the
>> transaction
>> state. But, because it’s an asynchronous operation, I don’t really think
>> we can
>> just make it throw all exceptions, even though I really think that
>> `send()` is the
>> method with the problem here.
>>
>> The contract of `flush()` is that it makes sure that all preceding sends
>> will have
>> completed, so it should be true that a well written application would be
>> able to
>> know which records were OK because of the Future<RecordMetadata> returned
>> by the `send()` method. It should be able to determine whether it wants to
>> commit
>> the transaction even if some of the intended operations didn’t succeed.
>>
>> What we don’t currently have is a way for the application to say to the
>> KafkaProducer
>> that it knows the outcome of sending the records and to confirm that it
>> wants to proceed.
>> Then it would not be necessary for `commitTransaction()` to throw an
>> exception to
>> report a historical error which the application might choose to ignore.
>>
>> Having read the comments, I think the KIP is on the right lines focusing
>> on the `flush()`
>> method. My suggestion is that we introduce an option on `flush()` to be
>> used before
>> `commitTransaction()` for applications that want to be able to commit
>> transactions which
>> had known failed operations.
>>
>> The code would be:
>>
>>   producer.beginTransaction();
>>
>>   future1 = producer.send(goodRecord1);
>>   future2 = producer.send(badRecord); // The future from this call will
>> complete exceptionally
>>   future3 = producer.send(goodRecord2);
>>
>>   producer.flush(FlushOption.TRANSACTION_READY);
>>
>>   // At this point, we know that all 3 futures are complete and the
>> transaction contains 2 records
>>   producer.commitTransaction();
>>
>> I wouldn’t deprecate `flush()` with no option. It just uses the default
>> option which behaves
>> like today.
>>
>> Why did I suggest an option on `flush()` rather than
>> `commitTransaction()`? Because with
>> `flush()`, it’s clear when the application is stating that it’s seen all
>> of the results from its
>> `send()` calls and it’s ready to proceed. If it has to rely on flushing
>> that occurs inside
>> `commitTransaction()`, I don’t see it’s as clear-cut.
>>
>> Thanks,
>> Andrew
>>
>>
>>
>>> On 24 Jun 2024, at 13:44, Alieh Saeedi <asae...@confluent.io.INVALID>
>> wrote:
>>>
>>> Hi all,
>>> Thanks for the interesting discussion.
>>>
>>> I assume that now the main questions are as follows:
>>>
>>> 1. Do we need to transit the transcation to the error state for API
>>> exceptions?
>>> 2. Should we throw the API exception in `send()` instead of returning a
>>> future error?
>>> 3. If the answer to question (1) is NO and to question (2) is YES, do we
>>> need to change the current `flush` or `commitTnx` at all?
>>>
>>> Cheers,
>>> Alieh
>>>
>>> On Sat, Jun 22, 2024 at 3:21 AM Matthias J. Sax <mj...@apache.org>
>> wrote:
>>>
>>>> Hey Kirk,
>>>>
>>>> can you elaborate on a few points?
>>>>
>>>>> Otherwise users would have to know to explicitly change their code to
>>>> invoke flush().
>>>>
>>>> Why? If we would add an option to `flush(FlushOption)`, the existing
>>>> `flush()` w/o any option will still be there, right? If we would really
>>>> deprecate existing `flush()`, it would just mean that we would pass
>>>> "default FlushOption" into an implicit flush (and yes, we would need to
>>>> define what this would be).
>>>>
>>>> I think there is no clear winner (as pointed out in my last reply), and
>>>> both `flush(FlushOption)` and `commitTx(CommitOption)` has advantages
>>>> and drawbacks. Guess we need to just agree on which tradeoff we want to
>>>> move forward with?
>>>>
>>>>
>>>> Not sure if your database example is a 1:1 fit? I think, the better
>>>> comparison would be:
>>>>
>>>> BEGIN TX;
>>>> INSERT INTO foo VALUES (’a’);
>>>> INSERT INTO foo VALUES (’b’);
>>>> INSERT INTO foo VALUES (’c’);
>>>> INSERT INTO foo VALUES (’not sure’);
>>>>
>>>> For this case, the full TX would roll back, right? I still think that
>>>> allowing users to just skip over the last error, and continue the TX
>>>> would be ok. In the end, we provide a programmatic API, and not a
>>>> declarative one as SQL. Of course, default behavior would still be to
>>>> put the producer into error state, and the user would need to call
>>>> `abortTransaction()` to move forward.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 6/21/24 5:26 PM, Kirk True wrote:
>>>>> Hi Matthias,
>>>>>
>>>>>> On Jun 21, 2024, at 12:28 PM, Matthias J. Sax <mj...@apache.org>
>> wrote:
>>>>>>
>>>>>> If we want to limit it to `RecordTooLargeException` throwing from
>>>> `send()` directly make sense. Thanks for calling it out.
>>>>>>
>>>>>> It's still a question of backward compatibility? `send()` does throw
>>>> exceptions already, including generic `KafkaException`. Not sure if this
>>>> helps with backward compatibility? Could we just add a new exception
>> type
>>>> (which is a child of `KafkaException`)?
>>>>>>
>>>>>> The Producer JavaDocs are not totally explicit about it IMHO.
>>>>>>
>>>>>> I think we could expect that some generic error handling path gets
>>>> executed. For the TX-case, I would assume that a TX would be aborted if
>>>> `send()` throws or that the producer would be `closed()`. Overall this
>>>> might be safe?
>>>>>>
>>>>>>> It would be a little less flexible
>>>>>>>> though, since (as you note) it would still be impossible to commit
>>>>>>>> transactions after errors have been reported from brokers.
>>>>>>
>>>>>> KS would still need a way to clear the error state of the producer. We
>>>> could catch a `RecordTooLargeException` from `send()`, call the handler
>> and
>>>> let it decide what to do next. But if it does return `CONTINUE` to
>> swallow
>>>> the error and drop the poison pill record on the floor, we would want to
>>>> move forward and commit the transaction.
>>>>>>
>>>>>> But the question is: if we cannot add a record to the tx, does the
>>>> producer need to go into error state? In the end, we did throw and
>> inform
>>>> the app that the record was _not_ added, and it's up to the app to
>> decide
>>>> what to do next?
>>>>>
>>>>> That’s an excellent question…
>>>>>
>>>>> Imagine the user’s application is writing information to a database
>>>> instead of Kafka. If there’s a table with a CHAR(1) column and this SQL
>>>> statement was attempted, what should happen?
>>>>>
>>>>>    INSERT INTO foo VALUES (’not sure’);
>>>>>
>>>>> Yes, that DML would fail, sure, but would the user expect that the
>>>> connection used by database library would get stuck in some kind of
>> error
>>>> state? A user would be able catch the error and either continue or
>> abort,
>>>> based on their business rules.
>>>>>
>>>>> So I agree with what I believe you’re implying: we shouldn’t poison the
>>>> Producer/TransactionManager on certain types of application-level
>> errors in
>>>> send().
>>>>>
>>>>> Kirk
>>>>>
>>>>>> If we report the error only via the `Callback` it's a different story,
>>>> because the contract for this case is clearly specified on the JavaDocs:
>>>>>>
>>>>>>> When used as part of a transaction, it is not necessary to define a
>>>> callback or check the result of the future
>>>>>>> in order to detect errors from <code>send</code>. If any of the send
>>>> calls failed with an irrecoverable error,
>>>>>>> the final {@link #commitTransaction()} call will fail and throw the
>>>> exception from the last failed send. When
>>>>>>> this happens, your application should call {@link
>> #abortTransaction()}
>>>> to reset the state and continue to send
>>>>>>> data.
>>>>>>
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 6/21/24 11:42 AM, Chris Egerton wrote:
>>>>>>> Hi Artem,
>>>>>>> I think it'd make sense to throw directly from send whenever
>> possible,
>>>>>>> instead of returning an already-completed future. I didn't do that in
>>>> my
>>>>>>> bug fix to try to be conservative about breaking changes but this
>>>> seems to
>>>>>>> have caused its own set of headaches. It would be a little less
>>>> flexible
>>>>>>> though, since (as you note) it would still be impossible to commit
>>>>>>> transactions after errors have been reported from brokers.
>>>>>>> I'll leave it up to the Kafka Streams folks to decide if that
>>>> flexibility
>>>>>>> is required. If it is, then users could explicitly call flush()
>> before
>>>>>>> committing (and ignoring errors for) or aborting a transaction, if
>> they
>>>>>>> want to implement fine-grained error handling logic such as allowing
>>>> errors
>>>>>>> for a subset of topics to be ignored.
>>>>>>> Hi Matthias,
>>>>>>> Most of the time you're right and we can't throw from send();
>> however,
>>>> in
>>>>>>> this case (client-side record-too-large exception), the error is
>>>> actually
>>>>>>> noticed by the producer before send() returns, so it should be
>>>> possible to
>>>>>>> throw directly.
>>>>>>> Cheers,
>>>>>>> Chris
>>>>>>> On Fri, Jun 21, 2024, 14:25 Matthias J. Sax <mj...@apache.org>
>> wrote:
>>>>>>>> Not sure if we can change send and make it throw, given that send()
>> is
>>>>>>>> async? That is why users can register a `Callback` to begin with,
>>>> right?
>>>>>>>>
>>>>>>>> And Alieh's point about backward compatibility is also a fair
>> concern.
>>>>>>>>
>>>>>>>>
>>>>>>>>> Actually, this would potentially be even
>>>>>>>>> worse than the original buggy behavior because the bug was that we
>>>>>>>> ignored
>>>>>>>>> errors that happened in the "send()" method itself, not necessarily
>>>> the
>>>>>>>>> ones that we got from the broker.
>>>>>>>>
>>>>>>>> My understanding was that `commitTx(swallowError)` would only
>> swallow
>>>>>>>> `send()` errors, not errors about the actually commit. I agree that
>> it
>>>>>>>> would be very bad to swallow errors about the actual tx commit...
>>>>>>>>
>>>>>>>> It's a fair question if this might be too subtle; to make it
>> explicit,
>>>>>>>> we could use `CommitOpions#ignorePendingSendErors()` [working name]
>> to
>>>>>>>> make it clear.
>>>>>>>>
>>>>>>>>
>>>>>>>> If we think it's too subtle to change commit to swallow send()
>> errors,
>>>>>>>> maybe going with `flush(FlushOptions)` would be clearer (and we can
>>>> use
>>>>>>>> `FlushOption#swallowSendErrorsForTransactions()` [working name] to
>> be
>>>>>>>> explicitly that the `FlushOption` for now has only an effect for
>> TX).
>>>>>>>>
>>>>>>>>
>>>>>>>> Thoughts?
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 6/21/24 4:10 AM, Alieh Saeedi wrote:
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> It is very exciting to see all the experts here raising very good
>>>> points.
>>>>>>>>>
>>>>>>>>> As we go further, we see more and more options to improve our
>>>> solution,
>>>>>>>>> which makes concluding and updating the KIP impossible.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> The main suggestions so far are:
>>>>>>>>>
>>>>>>>>> 1. `flush` with `flushOptions` as input parameter
>>>>>>>>>
>>>>>>>>> 2. `commitTx` with `commitOptions` as input parameter
>>>>>>>>>
>>>>>>>>> 3. `send` must throw the exception
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> My concern about the 3rd suggestion:
>>>>>>>>>
>>>>>>>>> 1. Does the change cause any issue with backward compatibility?
>>>>>>>>>
>>>>>>>>> 2. The `send (bad record)` already transits the transaction to the
>>>> error
>>>>>>>>> state. No user, including Streams is able to transit the
>> transaction
>>>> back
>>>>>>>>> from the error state. Do you mean we remove the
>>>>>>>>> `maybeTransitionToErrorState(e)` from here
>>>>>>>>> <
>>>>>>>>
>>>>
>> https://github.com/apache/kafka/blob/9b5b434e2a6b2d5290ea403fc02859b1c523d8aa/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1112
>>>>>>>>>
>>>>>>>>> as well?
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Alieh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Jun 21, 2024 at 8:45 AM Andrew Schofield <
>>>>>>>> andrew_schofi...@live.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Artem,
>>>>>>>>>> I think you make a good point which is worth further
>> consideration.
>>>> If
>>>>>>>>>> any of the existing methods is really ripe for a change here, it’s
>>>> the
>>>>>>>>>> send() that actually caused the problem. If that can be fixed so
>>>> there
>>>>>>>> are
>>>>>>>>>> no situations in which a lurking error breaks a transaction, that
>>>> might
>>>>>>>> be
>>>>>>>>>> the best.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Andrew
>>>>>>>>>>
>>>>>>>>>>> On 21 Jun 2024, at 01:51, Artem Livshits <alivsh...@confluent.io
>>>>>>>> .INVALID>
>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I thought we still wait for requests (and their errors) to come
>>>> in and
>>>>>>>>>>> could handle fatal errors appropriately.
>>>>>>>>>>>
>>>>>>>>>>> We do wait for requests, but my understanding is that when
>>>>>>>>>>> commitTransaction("ignore send errors") we want to ignore errors.
>>>> So
>>>>>>>> if
>>>>>>>>>> we
>>>>>>>>>>> do
>>>>>>>>>>>
>>>>>>>>>>> 1. send
>>>>>>>>>>> 2. commitTransaction("ignore send errors")
>>>>>>>>>>>
>>>>>>>>>>> the commit will succeed.  You can look at the example in
>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 and just
>>>> substitute
>>>>>>>>>>> commitTransaction with commitTransaction("ignore send errors")
>> and
>>>> we
>>>>>>>> get
>>>>>>>>>>> the buggy behavior back :-).  Actually, this would potentially be
>>>> even
>>>>>>>>>>> worse than the original buggy behavior because the bug was that
>> we
>>>>>>>>>> ignored
>>>>>>>>>>> errors that happened in the "send()" method itself, not
>>>> necessarily the
>>>>>>>>>>> ones that we got from the broker.
>>>>>>>>>>>
>>>>>>>>>>> Actually, looking at
>>>> https://github.com/apache/kafka/pull/11508/files,
>>>>>>>>>>> wouldn't a better solution be to just throw the error from the
>>>> "send"
>>>>>>>>>>> method itself, rather than trying to set it to be thrown during
>>>> commit?
>>>>>>>>>>> This way the example in
>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279
>>>>>>>>>>> would be fixed, and at the same time it would give an opportunity
>>>> for
>>>>>>>> KS
>>>>>>>>>> to
>>>>>>>>>>> catch the error and ignore it if needed.  Not sure if we need a
>>>> KIP for
>>>>>>>>>>> that, just do a better fix of the old bug.
>>>>>>>>>>>
>>>>>>>>>>> -Artem
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jun 20, 2024 at 4:58 PM Justine Olshan
>>>>>>>>>> <jols...@confluent.io.invalid>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I'm a bit late to the party, but the discussion here looks
>>>> reasonable.
>>>>>>>>>>>> Moving the logic to a transactional method makes sense to me and
>>>> makes
>>>>>>>>>> me
>>>>>>>>>>>> feel a bit better about keeping the complexity in the methods
>>>> relevant
>>>>>>>>>> to
>>>>>>>>>>>> the issue.
>>>>>>>>>>>>
>>>>>>>>>>>>> One minor concern is that if we set "ignore send
>>>>>>>>>>>> errors" (or whatever we decide to name it) option without
>> explicit
>>>>>>>>>> flush,
>>>>>>>>>>>> it'll actually lead to broken behavior as the application won't
>> be
>>>>>>>> able
>>>>>>>>>> to
>>>>>>>>>>>> stop a commit from proceeding even on fatal errors.
>>>>>>>>>>>>
>>>>>>>>>>>> Is this with respect to the case a request is still inflight
>> when
>>>> we
>>>>>>>>>> call
>>>>>>>>>>>> commitTransaction? I thought we still wait for requests (and
>> their
>>>>>>>>>> errors)
>>>>>>>>>>>> to come in and could handle fatal errors appropriately.
>>>>>>>>>>>>
>>>>>>>>>>>> Justine
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Jun 20, 2024 at 4:32 PM Artem Livshits
>>>>>>>>>>>> <alivsh...@confluent.io.invalid> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Matthias (and other folks who suggested ideas),
>>>>>>>>>>>>>
>>>>>>>>>>>>>> maybe `commitTransaction(CommitOptions)` or similar could be a
>>>> good
>>>>>>>>>>>> way
>>>>>>>>>>>>> forward?
>>>>>>>>>>>>>
>>>>>>>>>>>>> I like this approach.  One minor concern is that if we set
>>>> "ignore
>>>>>>>> send
>>>>>>>>>>>>> errors" (or whatever we decide to name it) option without
>>>> explicit
>>>>>>>>>> flush,
>>>>>>>>>>>>> it'll actually lead to broken behavior as the application won't
>>>> be
>>>>>>>> able
>>>>>>>>>>>> to
>>>>>>>>>>>>> stop a commit from proceeding even on fatal errors.  But I
>> guess
>>>>>>>> we'll
>>>>>>>>>>>> just
>>>>>>>>>>>>> have to clearly document it.
>>>>>>>>>>>>>
>>>>>>>>>>>>> In some way we are basically adding a flag to optionally
>> restore
>>>> the
>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 bug, which is
>>>> the
>>>>>>>>>>>>> motivation for all these changes, anyway :-).
>>>>>>>>>>>>>
>>>>>>>>>>>>> -Artem
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 2:18 PM Matthias J. Sax <
>>>> mj...@apache.org>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Seems the option to use a config does not get a lot of
>> support.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> So we need to go with some form or "overload / new method". I
>>>> think
>>>>>>>>>>>>>> Chris' point about not coupling it to `flush()` but rather
>>>>>>>>>>>>>> `commitTransaction()` is actually a very good one; for non-tx
>>>> case,
>>>>>>>>>> the
>>>>>>>>>>>>>> different flush variants would not make sense.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I also like Lianet's idea to pass in some "options" object, so
>>>> maybe
>>>>>>>>>>>>>> `commitTransaction(CommitOptions)` or similar could be a good
>>>> way
>>>>>>>>>>>>>> forward? It's much better than a `boolean` parameter,
>>>> aesthetically,
>>>>>>>>>> as
>>>>>>>>>>>>>> we as extendable in the future if necessary.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Given that we would pass in an optional parameter, we might
>> not
>>>> even
>>>>>>>>>>>>>> need to deprecate the existing `commitTransaction()` method?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 6/20/24 9:12 AM, Andrew Schofield wrote:
>>>>>>>>>>>>>>> Hi Alieh,
>>>>>>>>>>>>>>> Thanks for the KIP.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I *really* don’t like adding a config which changes the
>>>> behaviour
>>>>>>>> of
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> flush() method. We already have too many configs. But I
>> totally
>>>>>>>>>>>>>> understand
>>>>>>>>>>>>>>> the problem that you’re trying to solve and some of the other
>>>>>>>>>>>>> suggestions
>>>>>>>>>>>>>>> in this thread seem neater.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Personally, I would add another method to KafkaProducer. Not
>> an
>>>>>>>>>>>>> overload
>>>>>>>>>>>>>>> on flush() because this is not flush() at all. Using
>> Matthias’s
>>>>>>>>>>>>> options,
>>>>>>>>>>>>>>> I prefer (3).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Andrew
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On 20 Jun 2024, at 15:08, Lianet M. <liane...@gmail.com>
>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi all, thanks for the KIP Alieh!
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> LM1. Totally agree with Artem's point about the config not
>>>> being
>>>>>>>> the
>>>>>>>>>>>>>> most
>>>>>>>>>>>>>>>> explicit/flexible way to express this capability. Getting
>>>> then to
>>>>>>>>>>>>>> Matthias
>>>>>>>>>>>>>>>> 4 options, what I don't like about 3 and 4 is that it seems
>>>> they
>>>>>>>>>>>> might
>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>> age very well? Aren't we going to be wanting some other
>> twist
>>>> to
>>>>>>>> the
>>>>>>>>>>>>>> flush
>>>>>>>>>>>>>>>> semantics that will have us adding yet another param to it,
>> or
>>>>>>>>>>>> another
>>>>>>>>>>>>>>>> overloaded method? I truly don't have the context to answer
>>>> that,
>>>>>>>>>>>> but
>>>>>>>>>>>>>> if it
>>>>>>>>>>>>>>>> feels like a realistic future maybe adding some kind
>>>> FlushOptions
>>>>>>>>>>>>>> params to
>>>>>>>>>>>>>>>> the flush would be better from an extensibility point of
>>>> view. It
>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>> only have the clearErrors option available for now but could
>>>>>>>> accept
>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>> other we may need. I find that this would remove the
>>>> "ugliness"
>>>>>>>>>>>>> Matthias
>>>>>>>>>>>>>>>> pointed out for 3. and 4.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> LM2. No matter how we end up expressing the different
>>>> semantics
>>>>>>>> for
>>>>>>>>>>>>>> flush,
>>>>>>>>>>>>>>>> let's make sure we update the KIP on the flush and
>>>>>>>> commitTransaction
>>>>>>>>>>>>>> java
>>>>>>>>>>>>>>>> docs. It currently states that  flush "clears the last
>>>> exception"
>>>>>>>>>>>> and
>>>>>>>>>>>>>>>> commitTransaction "will NOT throw" if called after flush,
>> but
>>>> it
>>>>>>>>>>>>> really
>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>> depends on the config/options/method used.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> LM3. I find it would be helpful to include an example to
>> show
>>>> the
>>>>>>>>>>>> new
>>>>>>>>>>>>>> flow
>>>>>>>>>>>>>>>> that we're unblocking (I see this as the great gain here):
>>>> flush
>>>>>>>>>>>> with
>>>>>>>>>>>>>> clear
>>>>>>>>>>>>>>>> error option enabled -> catch and do whatever error handling
>>>> we
>>>>>>>> want
>>>>>>>>>>>>> ->
>>>>>>>>>>>>>>>> commitTransaction successfully
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Lianet
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 11:26 PM Chris Egerton <
>>>>>>>>>>>>> fearthecel...@gmail.com
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I like the alternatives you've listed. One more that might
>>>> help
>>>>>>>> is
>>>>>>>>>>>>> if,
>>>>>>>>>>>>>>>>> instead of overloading flush(), we overloaded
>>>> commitTransaction()
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> something like commitTransaction(boolean
>>>> tolerateRecordErrors).
>>>>>>>>>>>> This
>>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>>>> slightly cleaner in that it takes the behavioral change we
>>>> want,
>>>>>>>>>>>>> which
>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>> applies to transactional producers, to an API method that
>> is
>>>> only
>>>>>>>>>>>>> used
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>> transactional producers. It would also avoid the issue of
>>>> whether
>>>>>>>>>>>> or
>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>> flush() (or a new variant of it with altered semantics)
>>>> should
>>>>>>>>>>>> throw
>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>> not. Thoughts?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Alieh,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for the KIP, I like this direction a lot more than
>> the
>>>>>>>>>>>>> pluggable
>>>>>>>>>>>>>>>>> handler!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I share Artem's concerns that enabling this behavior via
>>>>>>>>>>>>> configuration
>>>>>>>>>>>>>>>>> doesn't seem like a great fit. It's likely that application
>>>> code
>>>>>>>>>>>> will
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>> written in a style that only works with one type of
>> behavior
>>>> from
>>>>>>>>>>>>>>>>> transactional producers, so requiring that application code
>>>> to
>>>>>>>>>>>>> declare
>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>>>> expectations for the behavior of its producer seems more
>>>>>>>>>>>> appropriate
>>>>>>>>>>>>>> than,
>>>>>>>>>>>>>>>>> e.g., allowing users deploying that application to tweak a
>>>>>>>>>>>>>> configuration
>>>>>>>>>>>>>>>>> file that gets fed to producers spun up inside it.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Chris
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 10:32 PM Matthias J. Sax <
>>>>>>>> mj...@apache.org
>>>>>>>>>>>>>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks for the KIP Alieh. I actually like the KIP as-is,
>> but
>>>>>>>> think
>>>>>>>>>>>>>>>>>> Arthem raises very good points...
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Seems we have four options on how to move forward?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>  1. add config to allow "silent error clearance" as the
>>>> KIP
>>>>>>>>>>>>> proposes
>>>>>>>>>>>>>>>>>>  2. change flush() to clear error and let it throw
>>>>>>>>>>>>>>>>>>  3. add new flushAndThrow()` (or better name) which
>> clears
>>>>>>>> error
>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> throws
>>>>>>>>>>>>>>>>>>  4. add `flush(boolean clearAndThrow)` and let user pick
>>>> (and
>>>>>>>>>>>>>> deprecate
>>>>>>>>>>>>>>>>>> existing `flush()`)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> For (2), given that it would be a behavior change, we
>> might
>>>> also
>>>>>>>>>>>>> need
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>> public "feature flag" config.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> It seems, both (1) and (2) have the issue Artem mentioned.
>>>> (3)
>>>>>>>> and
>>>>>>>>>>>>> (4)
>>>>>>>>>>>>>>>>>> would be safer to this end, however, for both we kinda get
>>>> an
>>>>>>>> ugly
>>>>>>>>>>>>>> API?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Not sure right now if I have any preference. Seems we need
>>>> to
>>>>>>>> pick
>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>> evil and that there is no clear best solution? Would be
>>>> good to
>>>>>>>>>>>> her
>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>> others what they think
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On 6/18/24 8:39 PM, Artem Livshits wrote:
>>>>>>>>>>>>>>>>>>> Hi Alieh,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thank you for the KIP.  I have a couple of suggestions:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> AL1.  We should throw an error from flush after we clear
>>>> it.
>>>>>>>>>>>> This
>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>> make it so that both "send + commit" and "send + flush +
>>>>>>>> commit"
>>>>>>>>>>>>> (the
>>>>>>>>>>>>>>>>>>> latter looks like just a more verbose way to express the
>>>>>>>> former,
>>>>>>>>>>>>> and
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>> would be intuitive if it behaves the same) would throw if
>>>> the
>>>>>>>>>>>>>>>>> transaction
>>>>>>>>>>>>>>>>>>> has an error (so if the code is written either way it's
>>>> going
>>>>>>>> be
>>>>>>>>>>>>>>>>>> correct).
>>>>>>>>>>>>>>>>>>> At the same time, the latter could be extended by the
>>>> caller to
>>>>>>>>>>>>>>>>> intercept
>>>>>>>>>>>>>>>>>>> exceptions from flush, ignore as needed, and commit the
>>>>>>>>>>>>> transaction.
>>>>>>>>>>>>>>>>>> This
>>>>>>>>>>>>>>>>>>> solution would keep basic things simple (if someone has
>>>> code
>>>>>>>> that
>>>>>>>>>>>>>>>>> doesn't
>>>>>>>>>>>>>>>>>>> require advanced error handling, then basic "send +
>> flush +
>>>>>>>>>>>> commit"
>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>> do the right thing) and advanced things possible, an
>>>>>>>> application
>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>> add
>>>>>>>>>>>>>>>>>>> try + catch around flush and ignore some errors.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> AL2.  I'm not sure if config is the best way to express
>> the
>>>>>>>>>>>>>>>>> modification
>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>> the "flush" semantics -- the application logic that calls
>>>>>>>> "flush"
>>>>>>>>>>>>>> needs
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> match the "flush" semantics and configuring semantics in
>> a
>>>>>>>>>>>> detached
>>>>>>>>>>>>>>>>> place
>>>>>>>>>>>>>>>>>>> creates a room for bugs due to discrepancies.  This can
>> be
>>>>>>>>>>>>> especially
>>>>>>>>>>>>>>>>> bad
>>>>>>>>>>>>>>>>>>> if the producer loads configuration from a file at run
>>>> time, in
>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>> case a
>>>>>>>>>>>>>>>>>>> mistake in configuration could break the application
>>>> because it
>>>>>>>>>>>> was
>>>>>>>>>>>>>>>>>> written
>>>>>>>>>>>>>>>>>>> to expect one "flush" semantics but the semantics is
>>>> switched.
>>>>>>>>>>>>> Given
>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> the "flush" semantics needs to match the caller's
>>>> expectation,
>>>>>>>> a
>>>>>>>>>>>>> way
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> accomplish that would be to pass the caller's expectation
>>>> to
>>>>>>>> the
>>>>>>>>>>>>>>>>> "flush"
>>>>>>>>>>>>>>>>>>> call by either have a method with a different name or
>> have
>>>> an
>>>>>>>>>>>>>> overload
>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>> a Boolen flag that would configure the semantics (the
>>>> current
>>>>>>>>>>>>> method
>>>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>> just redirect to the new one).
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> -Artem
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Mon, Jun 17, 2024 at 9:09 AM Alieh Saeedi
>>>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I'd like to kick off a discussion for KIP-1059 that
>>>> suggests
>>>>>>>>>>>>> adding
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>> feature to the Producer flush() method.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1059%3A+Enable+the+Producer+flush%28%29+method+to+clear+the+latest+send%28%29+error
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>> Alieh
>>
>>
>>

Reply via email to