Agreed. Options 1 and 3 are safe. Option 2 is not. I’d be happy with 3a as the way.
I suggest “TRANSACTION VERIFIED”. There isn’t really precedent for options in the producer API. We could use an enum, which is easy to use and not very future-proof. Or we could use a class like the admin API does, which is cumbersome and flexible. CommitTransactionOptions.TRANSACTION_VERIFIED or public class CommitTransactionOptions { public CommitTransactionOptions(); CommitTransactionOptions transactionVerified(boolean transactionVerified); boolean transactionVerified(); } Then 3b is: send(…) send(…) flush() commitTransaction(new CommitTransactionOptions().transactionVerified(true)) I’d tend towards the enum here because I doubt we need as much flexibility as the admin API requires. Thanks, Andrew > On 24 Jun 2024, at 18:39, Matthias J. Sax <mj...@apache.org> wrote: > > I am ok either way (ie, flush or commit), but I think we need to define exact > semantics, and I think there is some subtle thing to consider: > > > > 1) flush(Options) > > Example: > > send(...) > send(...) > > flush(ignoreErrors) > > // at this point, we know that all Futures are completed and all Callbacks > are executed, and we can assume that all user code checking for errors did > execute, before `commitTx` is called > > // I consider this option as safe > > commitTx() > > > 2) commitTx(Option) > > Example: > > send(...) > send(...) > > // when commitTx is called, there is still pending Futures and not all > Callbacks are executed yet -- with the implicit flush, we know that all > Callbacks are executed, but even for this case, the user could only throw an > exception inside the Callback to stop the TX to eventually commit -- Futures > cannot be used to make a decision to ignore error and commit or not. > > // I consider this option not as safe > > commitTx(igrnoreErrors) > > > > 3a) required flush + commitTx(Option) > > Example: > > send(...) > send(...) > > flush() > > // at this point, we know that all Future are completed and all Callbacks > are executed, and we can assume that all user code checking for error did > execute, before `commitTx` is called > > // I consider this option as safe > > commitTx(ignoreErrors) > > > 3b) missing flush + commitTx(Option) > > Example: > > send(...) > send(...) > > // as flush() was not called explicitly, we should ignore `ignoreErrors` > flag and always throw an exception if the producer is in error state, because > we cannot be sure that the user did all required check for error handling > > commitTx(ignoreErrors) > > > > The only issue with option (3) is, that it's more complex and semantics are > more subtle. But it might be the a good (necessary?) bridge between (1) and > (2): (3) is semantically sound (we ignore errors via passing a flag into > commitTx() instead of flush()), and at the same time safe (we force users to > explicitly flush() and [hopefully] do proper error handling, and don't rely > in am implicit flush() during commitTx() which might be error prone). > > (Also need to find a good and descriptive name for the flag we pass into > `commitTx()` for this case.) > > > -Matthias > > > > On 6/24/24 8:51 AM, Andrew Schofield wrote: >> Hi Chris, >> That works for me too. I slightly prefer an option on flush(), but what you >> suggested >> works too. >> Thanks, >> Andrew >>> On 24 Jun 2024, at 15:14, Chris Egerton <chr...@aiven.io.INVALID> wrote: >>> >>> Hi Andrew, >>> >>> I like a lot of what you said, but I still believe it's better to override >>> commitTransaction than flush. Users will already have to manually opt in to >>> ignoring errors encountered during transactions, and we can document >>> recommended usage (i.e., explicitly invoking flush() before invoking >>> commitTransaction(ignoreRecordErrors)) in the newly-introduced method. I >>> don't believe it's worth the increased cognitive load on users with >>> non-transactional producers to introduce an overloaded flush() variant. >>> >>> Cheers, >>> >>> Chris >>> >>> On Mon, Jun 24, 2024 at 9:39 AM Andrew Schofield <andrew_schofi...@live.com> >>> wrote: >>> >>>> Hi Alieh, >>>> Thanks for driving this. Unfortunately, there are many parts of the API >>>> which >>>> are a bit unfortunate and it’s tricky to make small improvements that >>>> don’t have >>>> downsides. >>>> >>>> I don’t like the idea of using a configuration because configuration is >>>> often >>>> outside the application and changing the behaviour of someone else’s >>>> application >>>> without understanding it is risky. Anything which embeds a transactional >>>> producer >>>> could have its behaviour changed unexpectedly. >>>> >>>> It would be been much nicer if send() didn’t fail silently and change the >>>> transaction >>>> state. But, because it’s an asynchronous operation, I don’t really think >>>> we can >>>> just make it throw all exceptions, even though I really think that >>>> `send()` is the >>>> method with the problem here. >>>> >>>> The contract of `flush()` is that it makes sure that all preceding sends >>>> will have >>>> completed, so it should be true that a well written application would be >>>> able to >>>> know which records were OK because of the Future<RecordMetadata> returned >>>> by the `send()` method. It should be able to determine whether it wants to >>>> commit >>>> the transaction even if some of the intended operations didn’t succeed. >>>> >>>> What we don’t currently have is a way for the application to say to the >>>> KafkaProducer >>>> that it knows the outcome of sending the records and to confirm that it >>>> wants to proceed. >>>> Then it would not be necessary for `commitTransaction()` to throw an >>>> exception to >>>> report a historical error which the application might choose to ignore. >>>> >>>> Having read the comments, I think the KIP is on the right lines focusing >>>> on the `flush()` >>>> method. My suggestion is that we introduce an option on `flush()` to be >>>> used before >>>> `commitTransaction()` for applications that want to be able to commit >>>> transactions which >>>> had known failed operations. >>>> >>>> The code would be: >>>> >>>> producer.beginTransaction(); >>>> >>>> future1 = producer.send(goodRecord1); >>>> future2 = producer.send(badRecord); // The future from this call will >>>> complete exceptionally >>>> future3 = producer.send(goodRecord2); >>>> >>>> producer.flush(FlushOption.TRANSACTION_READY); >>>> >>>> // At this point, we know that all 3 futures are complete and the >>>> transaction contains 2 records >>>> producer.commitTransaction(); >>>> >>>> I wouldn’t deprecate `flush()` with no option. It just uses the default >>>> option which behaves >>>> like today. >>>> >>>> Why did I suggest an option on `flush()` rather than >>>> `commitTransaction()`? Because with >>>> `flush()`, it’s clear when the application is stating that it’s seen all >>>> of the results from its >>>> `send()` calls and it’s ready to proceed. If it has to rely on flushing >>>> that occurs inside >>>> `commitTransaction()`, I don’t see it’s as clear-cut. >>>> >>>> Thanks, >>>> Andrew >>>> >>>> >>>> >>>>> On 24 Jun 2024, at 13:44, Alieh Saeedi <asae...@confluent.io.INVALID> >>>> wrote: >>>>> >>>>> Hi all, >>>>> Thanks for the interesting discussion. >>>>> >>>>> I assume that now the main questions are as follows: >>>>> >>>>> 1. Do we need to transit the transcation to the error state for API >>>>> exceptions? >>>>> 2. Should we throw the API exception in `send()` instead of returning a >>>>> future error? >>>>> 3. If the answer to question (1) is NO and to question (2) is YES, do we >>>>> need to change the current `flush` or `commitTnx` at all? >>>>> >>>>> Cheers, >>>>> Alieh >>>>> >>>>> On Sat, Jun 22, 2024 at 3:21 AM Matthias J. Sax <mj...@apache.org> >>>> wrote: >>>>> >>>>>> Hey Kirk, >>>>>> >>>>>> can you elaborate on a few points? >>>>>> >>>>>>> Otherwise users would have to know to explicitly change their code to >>>>>> invoke flush(). >>>>>> >>>>>> Why? If we would add an option to `flush(FlushOption)`, the existing >>>>>> `flush()` w/o any option will still be there, right? If we would really >>>>>> deprecate existing `flush()`, it would just mean that we would pass >>>>>> "default FlushOption" into an implicit flush (and yes, we would need to >>>>>> define what this would be). >>>>>> >>>>>> I think there is no clear winner (as pointed out in my last reply), and >>>>>> both `flush(FlushOption)` and `commitTx(CommitOption)` has advantages >>>>>> and drawbacks. Guess we need to just agree on which tradeoff we want to >>>>>> move forward with? >>>>>> >>>>>> >>>>>> Not sure if your database example is a 1:1 fit? I think, the better >>>>>> comparison would be: >>>>>> >>>>>> BEGIN TX; >>>>>> INSERT INTO foo VALUES (’a’); >>>>>> INSERT INTO foo VALUES (’b’); >>>>>> INSERT INTO foo VALUES (’c’); >>>>>> INSERT INTO foo VALUES (’not sure’); >>>>>> >>>>>> For this case, the full TX would roll back, right? I still think that >>>>>> allowing users to just skip over the last error, and continue the TX >>>>>> would be ok. In the end, we provide a programmatic API, and not a >>>>>> declarative one as SQL. Of course, default behavior would still be to >>>>>> put the producer into error state, and the user would need to call >>>>>> `abortTransaction()` to move forward. >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> On 6/21/24 5:26 PM, Kirk True wrote: >>>>>>> Hi Matthias, >>>>>>> >>>>>>>> On Jun 21, 2024, at 12:28 PM, Matthias J. Sax <mj...@apache.org> >>>> wrote: >>>>>>>> >>>>>>>> If we want to limit it to `RecordTooLargeException` throwing from >>>>>> `send()` directly make sense. Thanks for calling it out. >>>>>>>> >>>>>>>> It's still a question of backward compatibility? `send()` does throw >>>>>> exceptions already, including generic `KafkaException`. Not sure if this >>>>>> helps with backward compatibility? Could we just add a new exception >>>> type >>>>>> (which is a child of `KafkaException`)? >>>>>>>> >>>>>>>> The Producer JavaDocs are not totally explicit about it IMHO. >>>>>>>> >>>>>>>> I think we could expect that some generic error handling path gets >>>>>> executed. For the TX-case, I would assume that a TX would be aborted if >>>>>> `send()` throws or that the producer would be `closed()`. Overall this >>>>>> might be safe? >>>>>>>> >>>>>>>>> It would be a little less flexible >>>>>>>>>> though, since (as you note) it would still be impossible to commit >>>>>>>>>> transactions after errors have been reported from brokers. >>>>>>>> >>>>>>>> KS would still need a way to clear the error state of the producer. We >>>>>> could catch a `RecordTooLargeException` from `send()`, call the handler >>>> and >>>>>> let it decide what to do next. But if it does return `CONTINUE` to >>>> swallow >>>>>> the error and drop the poison pill record on the floor, we would want to >>>>>> move forward and commit the transaction. >>>>>>>> >>>>>>>> But the question is: if we cannot add a record to the tx, does the >>>>>> producer need to go into error state? In the end, we did throw and >>>> inform >>>>>> the app that the record was _not_ added, and it's up to the app to >>>> decide >>>>>> what to do next? >>>>>>> >>>>>>> That’s an excellent question… >>>>>>> >>>>>>> Imagine the user’s application is writing information to a database >>>>>> instead of Kafka. If there’s a table with a CHAR(1) column and this SQL >>>>>> statement was attempted, what should happen? >>>>>>> >>>>>>> INSERT INTO foo VALUES (’not sure’); >>>>>>> >>>>>>> Yes, that DML would fail, sure, but would the user expect that the >>>>>> connection used by database library would get stuck in some kind of >>>> error >>>>>> state? A user would be able catch the error and either continue or >>>> abort, >>>>>> based on their business rules. >>>>>>> >>>>>>> So I agree with what I believe you’re implying: we shouldn’t poison the >>>>>> Producer/TransactionManager on certain types of application-level >>>> errors in >>>>>> send(). >>>>>>> >>>>>>> Kirk >>>>>>> >>>>>>>> If we report the error only via the `Callback` it's a different story, >>>>>> because the contract for this case is clearly specified on the JavaDocs: >>>>>>>> >>>>>>>>> When used as part of a transaction, it is not necessary to define a >>>>>> callback or check the result of the future >>>>>>>>> in order to detect errors from <code>send</code>. If any of the send >>>>>> calls failed with an irrecoverable error, >>>>>>>>> the final {@link #commitTransaction()} call will fail and throw the >>>>>> exception from the last failed send. When >>>>>>>>> this happens, your application should call {@link >>>> #abortTransaction()} >>>>>> to reset the state and continue to send >>>>>>>>> data. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> >>>>>>>> On 6/21/24 11:42 AM, Chris Egerton wrote: >>>>>>>>> Hi Artem, >>>>>>>>> I think it'd make sense to throw directly from send whenever >>>> possible, >>>>>>>>> instead of returning an already-completed future. I didn't do that in >>>>>> my >>>>>>>>> bug fix to try to be conservative about breaking changes but this >>>>>> seems to >>>>>>>>> have caused its own set of headaches. It would be a little less >>>>>> flexible >>>>>>>>> though, since (as you note) it would still be impossible to commit >>>>>>>>> transactions after errors have been reported from brokers. >>>>>>>>> I'll leave it up to the Kafka Streams folks to decide if that >>>>>> flexibility >>>>>>>>> is required. If it is, then users could explicitly call flush() >>>> before >>>>>>>>> committing (and ignoring errors for) or aborting a transaction, if >>>> they >>>>>>>>> want to implement fine-grained error handling logic such as allowing >>>>>> errors >>>>>>>>> for a subset of topics to be ignored. >>>>>>>>> Hi Matthias, >>>>>>>>> Most of the time you're right and we can't throw from send(); >>>> however, >>>>>> in >>>>>>>>> this case (client-side record-too-large exception), the error is >>>>>> actually >>>>>>>>> noticed by the producer before send() returns, so it should be >>>>>> possible to >>>>>>>>> throw directly. >>>>>>>>> Cheers, >>>>>>>>> Chris >>>>>>>>> On Fri, Jun 21, 2024, 14:25 Matthias J. Sax <mj...@apache.org> >>>> wrote: >>>>>>>>>> Not sure if we can change send and make it throw, given that send() >>>> is >>>>>>>>>> async? That is why users can register a `Callback` to begin with, >>>>>> right? >>>>>>>>>> >>>>>>>>>> And Alieh's point about backward compatibility is also a fair >>>> concern. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> Actually, this would potentially be even >>>>>>>>>>> worse than the original buggy behavior because the bug was that we >>>>>>>>>> ignored >>>>>>>>>>> errors that happened in the "send()" method itself, not necessarily >>>>>> the >>>>>>>>>>> ones that we got from the broker. >>>>>>>>>> >>>>>>>>>> My understanding was that `commitTx(swallowError)` would only >>>> swallow >>>>>>>>>> `send()` errors, not errors about the actually commit. I agree that >>>> it >>>>>>>>>> would be very bad to swallow errors about the actual tx commit... >>>>>>>>>> >>>>>>>>>> It's a fair question if this might be too subtle; to make it >>>> explicit, >>>>>>>>>> we could use `CommitOpions#ignorePendingSendErors()` [working name] >>>> to >>>>>>>>>> make it clear. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> If we think it's too subtle to change commit to swallow send() >>>> errors, >>>>>>>>>> maybe going with `flush(FlushOptions)` would be clearer (and we can >>>>>> use >>>>>>>>>> `FlushOption#swallowSendErrorsForTransactions()` [working name] to >>>> be >>>>>>>>>> explicitly that the `FlushOption` for now has only an effect for >>>> TX). >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thoughts? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 6/21/24 4:10 AM, Alieh Saeedi wrote: >>>>>>>>>>> Hi all, >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> It is very exciting to see all the experts here raising very good >>>>>> points. >>>>>>>>>>> >>>>>>>>>>> As we go further, we see more and more options to improve our >>>>>> solution, >>>>>>>>>>> which makes concluding and updating the KIP impossible. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> The main suggestions so far are: >>>>>>>>>>> >>>>>>>>>>> 1. `flush` with `flushOptions` as input parameter >>>>>>>>>>> >>>>>>>>>>> 2. `commitTx` with `commitOptions` as input parameter >>>>>>>>>>> >>>>>>>>>>> 3. `send` must throw the exception >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> My concern about the 3rd suggestion: >>>>>>>>>>> >>>>>>>>>>> 1. Does the change cause any issue with backward compatibility? >>>>>>>>>>> >>>>>>>>>>> 2. The `send (bad record)` already transits the transaction to the >>>>>> error >>>>>>>>>>> state. No user, including Streams is able to transit the >>>> transaction >>>>>> back >>>>>>>>>>> from the error state. Do you mean we remove the >>>>>>>>>>> `maybeTransitionToErrorState(e)` from here >>>>>>>>>>> < >>>>>>>>>> >>>>>> >>>> https://github.com/apache/kafka/blob/9b5b434e2a6b2d5290ea403fc02859b1c523d8aa/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1112 >>>>>>>>>>> >>>>>>>>>>> as well? >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> Alieh >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Jun 21, 2024 at 8:45 AM Andrew Schofield < >>>>>>>>>> andrew_schofi...@live.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Artem, >>>>>>>>>>>> I think you make a good point which is worth further >>>> consideration. >>>>>> If >>>>>>>>>>>> any of the existing methods is really ripe for a change here, it’s >>>>>> the >>>>>>>>>>>> send() that actually caused the problem. If that can be fixed so >>>>>> there >>>>>>>>>> are >>>>>>>>>>>> no situations in which a lurking error breaks a transaction, that >>>>>> might >>>>>>>>>> be >>>>>>>>>>>> the best. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Andrew >>>>>>>>>>>> >>>>>>>>>>>>> On 21 Jun 2024, at 01:51, Artem Livshits <alivsh...@confluent.io >>>>>>>>>> .INVALID> >>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> I thought we still wait for requests (and their errors) to come >>>>>> in and >>>>>>>>>>>>> could handle fatal errors appropriately. >>>>>>>>>>>>> >>>>>>>>>>>>> We do wait for requests, but my understanding is that when >>>>>>>>>>>>> commitTransaction("ignore send errors") we want to ignore errors. >>>>>> So >>>>>>>>>> if >>>>>>>>>>>> we >>>>>>>>>>>>> do >>>>>>>>>>>>> >>>>>>>>>>>>> 1. send >>>>>>>>>>>>> 2. commitTransaction("ignore send errors") >>>>>>>>>>>>> >>>>>>>>>>>>> the commit will succeed. You can look at the example in >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 and just >>>>>> substitute >>>>>>>>>>>>> commitTransaction with commitTransaction("ignore send errors") >>>> and >>>>>> we >>>>>>>>>> get >>>>>>>>>>>>> the buggy behavior back :-). Actually, this would potentially be >>>>>> even >>>>>>>>>>>>> worse than the original buggy behavior because the bug was that >>>> we >>>>>>>>>>>> ignored >>>>>>>>>>>>> errors that happened in the "send()" method itself, not >>>>>> necessarily the >>>>>>>>>>>>> ones that we got from the broker. >>>>>>>>>>>>> >>>>>>>>>>>>> Actually, looking at >>>>>> https://github.com/apache/kafka/pull/11508/files, >>>>>>>>>>>>> wouldn't a better solution be to just throw the error from the >>>>>> "send" >>>>>>>>>>>>> method itself, rather than trying to set it to be thrown during >>>>>> commit? >>>>>>>>>>>>> This way the example in >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 >>>>>>>>>>>>> would be fixed, and at the same time it would give an opportunity >>>>>> for >>>>>>>>>> KS >>>>>>>>>>>> to >>>>>>>>>>>>> catch the error and ignore it if needed. Not sure if we need a >>>>>> KIP for >>>>>>>>>>>>> that, just do a better fix of the old bug. >>>>>>>>>>>>> >>>>>>>>>>>>> -Artem >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Jun 20, 2024 at 4:58 PM Justine Olshan >>>>>>>>>>>> <jols...@confluent.io.invalid> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> I'm a bit late to the party, but the discussion here looks >>>>>> reasonable. >>>>>>>>>>>>>> Moving the logic to a transactional method makes sense to me and >>>>>> makes >>>>>>>>>>>> me >>>>>>>>>>>>>> feel a bit better about keeping the complexity in the methods >>>>>> relevant >>>>>>>>>>>> to >>>>>>>>>>>>>> the issue. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> One minor concern is that if we set "ignore send >>>>>>>>>>>>>> errors" (or whatever we decide to name it) option without >>>> explicit >>>>>>>>>>>> flush, >>>>>>>>>>>>>> it'll actually lead to broken behavior as the application won't >>>> be >>>>>>>>>> able >>>>>>>>>>>> to >>>>>>>>>>>>>> stop a commit from proceeding even on fatal errors. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Is this with respect to the case a request is still inflight >>>> when >>>>>> we >>>>>>>>>>>> call >>>>>>>>>>>>>> commitTransaction? I thought we still wait for requests (and >>>> their >>>>>>>>>>>> errors) >>>>>>>>>>>>>> to come in and could handle fatal errors appropriately. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Justine >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 4:32 PM Artem Livshits >>>>>>>>>>>>>> <alivsh...@confluent.io.invalid> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Matthias (and other folks who suggested ideas), >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> maybe `commitTransaction(CommitOptions)` or similar could be a >>>>>> good >>>>>>>>>>>>>> way >>>>>>>>>>>>>>> forward? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I like this approach. One minor concern is that if we set >>>>>> "ignore >>>>>>>>>> send >>>>>>>>>>>>>>> errors" (or whatever we decide to name it) option without >>>>>> explicit >>>>>>>>>>>> flush, >>>>>>>>>>>>>>> it'll actually lead to broken behavior as the application won't >>>>>> be >>>>>>>>>> able >>>>>>>>>>>>>> to >>>>>>>>>>>>>>> stop a commit from proceeding even on fatal errors. But I >>>> guess >>>>>>>>>> we'll >>>>>>>>>>>>>> just >>>>>>>>>>>>>>> have to clearly document it. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> In some way we are basically adding a flag to optionally >>>> restore >>>>>> the >>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 bug, which is >>>>>> the >>>>>>>>>>>>>>> motivation for all these changes, anyway :-). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -Artem >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 2:18 PM Matthias J. Sax < >>>>>> mj...@apache.org> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Seems the option to use a config does not get a lot of >>>> support. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> So we need to go with some form or "overload / new method". I >>>>>> think >>>>>>>>>>>>>>>> Chris' point about not coupling it to `flush()` but rather >>>>>>>>>>>>>>>> `commitTransaction()` is actually a very good one; for non-tx >>>>>> case, >>>>>>>>>>>> the >>>>>>>>>>>>>>>> different flush variants would not make sense. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I also like Lianet's idea to pass in some "options" object, so >>>>>> maybe >>>>>>>>>>>>>>>> `commitTransaction(CommitOptions)` or similar could be a good >>>>>> way >>>>>>>>>>>>>>>> forward? It's much better than a `boolean` parameter, >>>>>> aesthetically, >>>>>>>>>>>> as >>>>>>>>>>>>>>>> we as extendable in the future if necessary. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Given that we would pass in an optional parameter, we might >>>> not >>>>>> even >>>>>>>>>>>>>>>> need to deprecate the existing `commitTransaction()` method? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 6/20/24 9:12 AM, Andrew Schofield wrote: >>>>>>>>>>>>>>>>> Hi Alieh, >>>>>>>>>>>>>>>>> Thanks for the KIP. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I *really* don’t like adding a config which changes the >>>>>> behaviour >>>>>>>>>> of >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> flush() method. We already have too many configs. But I >>>> totally >>>>>>>>>>>>>>>> understand >>>>>>>>>>>>>>>>> the problem that you’re trying to solve and some of the other >>>>>>>>>>>>>>> suggestions >>>>>>>>>>>>>>>>> in this thread seem neater. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Personally, I would add another method to KafkaProducer. Not >>>> an >>>>>>>>>>>>>>> overload >>>>>>>>>>>>>>>>> on flush() because this is not flush() at all. Using >>>> Matthias’s >>>>>>>>>>>>>>> options, >>>>>>>>>>>>>>>>> I prefer (3). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> Andrew >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On 20 Jun 2024, at 15:08, Lianet M. <liane...@gmail.com> >>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi all, thanks for the KIP Alieh! >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> LM1. Totally agree with Artem's point about the config not >>>>>> being >>>>>>>>>> the >>>>>>>>>>>>>>>> most >>>>>>>>>>>>>>>>>> explicit/flexible way to express this capability. Getting >>>>>> then to >>>>>>>>>>>>>>>> Matthias >>>>>>>>>>>>>>>>>> 4 options, what I don't like about 3 and 4 is that it seems >>>>>> they >>>>>>>>>>>>>> might >>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>> age very well? Aren't we going to be wanting some other >>>> twist >>>>>> to >>>>>>>>>> the >>>>>>>>>>>>>>>> flush >>>>>>>>>>>>>>>>>> semantics that will have us adding yet another param to it, >>>> or >>>>>>>>>>>>>> another >>>>>>>>>>>>>>>>>> overloaded method? I truly don't have the context to answer >>>>>> that, >>>>>>>>>>>>>> but >>>>>>>>>>>>>>>> if it >>>>>>>>>>>>>>>>>> feels like a realistic future maybe adding some kind >>>>>> FlushOptions >>>>>>>>>>>>>>>> params to >>>>>>>>>>>>>>>>>> the flush would be better from an extensibility point of >>>>>> view. It >>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>> only have the clearErrors option available for now but could >>>>>>>>>> accept >>>>>>>>>>>>>>> any >>>>>>>>>>>>>>>>>> other we may need. I find that this would remove the >>>>>> "ugliness" >>>>>>>>>>>>>>> Matthias >>>>>>>>>>>>>>>>>> pointed out for 3. and 4. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> LM2. No matter how we end up expressing the different >>>>>> semantics >>>>>>>>>> for >>>>>>>>>>>>>>>> flush, >>>>>>>>>>>>>>>>>> let's make sure we update the KIP on the flush and >>>>>>>>>> commitTransaction >>>>>>>>>>>>>>>> java >>>>>>>>>>>>>>>>>> docs. It currently states that flush "clears the last >>>>>> exception" >>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>> commitTransaction "will NOT throw" if called after flush, >>>> but >>>>>> it >>>>>>>>>>>>>>> really >>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>> depends on the config/options/method used. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> LM3. I find it would be helpful to include an example to >>>> show >>>>>> the >>>>>>>>>>>>>> new >>>>>>>>>>>>>>>> flow >>>>>>>>>>>>>>>>>> that we're unblocking (I see this as the great gain here): >>>>>> flush >>>>>>>>>>>>>> with >>>>>>>>>>>>>>>> clear >>>>>>>>>>>>>>>>>> error option enabled -> catch and do whatever error handling >>>>>> we >>>>>>>>>> want >>>>>>>>>>>>>>> -> >>>>>>>>>>>>>>>>>> commitTransaction successfully >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks! >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Lianet >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 11:26 PM Chris Egerton < >>>>>>>>>>>>>>> fearthecel...@gmail.com >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I like the alternatives you've listed. One more that might >>>>>> help >>>>>>>>>> is >>>>>>>>>>>>>>> if, >>>>>>>>>>>>>>>>>>> instead of overloading flush(), we overloaded >>>>>> commitTransaction() >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> something like commitTransaction(boolean >>>>>> tolerateRecordErrors). >>>>>>>>>>>>>> This >>>>>>>>>>>>>>>> seems >>>>>>>>>>>>>>>>>>> slightly cleaner in that it takes the behavioral change we >>>>>> want, >>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>> only >>>>>>>>>>>>>>>>>>> applies to transactional producers, to an API method that >>>> is >>>>>> only >>>>>>>>>>>>>>> used >>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>> transactional producers. It would also avoid the issue of >>>>>> whether >>>>>>>>>>>>>> or >>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>> flush() (or a new variant of it with altered semantics) >>>>>> should >>>>>>>>>>>>>> throw >>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>> not. Thoughts? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi Alieh, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks for the KIP, I like this direction a lot more than >>>> the >>>>>>>>>>>>>>> pluggable >>>>>>>>>>>>>>>>>>> handler! >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I share Artem's concerns that enabling this behavior via >>>>>>>>>>>>>>> configuration >>>>>>>>>>>>>>>>>>> doesn't seem like a great fit. It's likely that application >>>>>> code >>>>>>>>>>>>>> will >>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>> written in a style that only works with one type of >>>> behavior >>>>>> from >>>>>>>>>>>>>>>>>>> transactional producers, so requiring that application code >>>>>> to >>>>>>>>>>>>>>> declare >>>>>>>>>>>>>>>> its >>>>>>>>>>>>>>>>>>> expectations for the behavior of its producer seems more >>>>>>>>>>>>>> appropriate >>>>>>>>>>>>>>>> than, >>>>>>>>>>>>>>>>>>> e.g., allowing users deploying that application to tweak a >>>>>>>>>>>>>>>> configuration >>>>>>>>>>>>>>>>>>> file that gets fed to producers spun up inside it. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Chris >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 10:32 PM Matthias J. Sax < >>>>>>>>>> mj...@apache.org >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks for the KIP Alieh. I actually like the KIP as-is, >>>> but >>>>>>>>>> think >>>>>>>>>>>>>>>>>>>> Arthem raises very good points... >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Seems we have four options on how to move forward? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 1. add config to allow "silent error clearance" as the >>>>>> KIP >>>>>>>>>>>>>>> proposes >>>>>>>>>>>>>>>>>>>> 2. change flush() to clear error and let it throw >>>>>>>>>>>>>>>>>>>> 3. add new flushAndThrow()` (or better name) which >>>> clears >>>>>>>>>> error >>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>> throws >>>>>>>>>>>>>>>>>>>> 4. add `flush(boolean clearAndThrow)` and let user pick >>>>>> (and >>>>>>>>>>>>>>>> deprecate >>>>>>>>>>>>>>>>>>>> existing `flush()`) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> For (2), given that it would be a behavior change, we >>>> might >>>>>> also >>>>>>>>>>>>>>> need >>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>> public "feature flag" config. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> It seems, both (1) and (2) have the issue Artem mentioned. >>>>>> (3) >>>>>>>>>> and >>>>>>>>>>>>>>> (4) >>>>>>>>>>>>>>>>>>>> would be safer to this end, however, for both we kinda get >>>>>> an >>>>>>>>>> ugly >>>>>>>>>>>>>>>> API? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Not sure right now if I have any preference. Seems we need >>>>>> to >>>>>>>>>> pick >>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>> evil and that there is no clear best solution? Would be >>>>>> good to >>>>>>>>>>>>>> her >>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>> others what they think >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On 6/18/24 8:39 PM, Artem Livshits wrote: >>>>>>>>>>>>>>>>>>>>> Hi Alieh, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thank you for the KIP. I have a couple of suggestions: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> AL1. We should throw an error from flush after we clear >>>>>> it. >>>>>>>>>>>>>> This >>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>> make it so that both "send + commit" and "send + flush + >>>>>>>>>> commit" >>>>>>>>>>>>>>> (the >>>>>>>>>>>>>>>>>>>>> latter looks like just a more verbose way to express the >>>>>>>>>> former, >>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>> would be intuitive if it behaves the same) would throw if >>>>>> the >>>>>>>>>>>>>>>>>>> transaction >>>>>>>>>>>>>>>>>>>>> has an error (so if the code is written either way it's >>>>>> going >>>>>>>>>> be >>>>>>>>>>>>>>>>>>>> correct). >>>>>>>>>>>>>>>>>>>>> At the same time, the latter could be extended by the >>>>>> caller to >>>>>>>>>>>>>>>>>>> intercept >>>>>>>>>>>>>>>>>>>>> exceptions from flush, ignore as needed, and commit the >>>>>>>>>>>>>>> transaction. >>>>>>>>>>>>>>>>>>>> This >>>>>>>>>>>>>>>>>>>>> solution would keep basic things simple (if someone has >>>>>> code >>>>>>>>>> that >>>>>>>>>>>>>>>>>>> doesn't >>>>>>>>>>>>>>>>>>>>> require advanced error handling, then basic "send + >>>> flush + >>>>>>>>>>>>>> commit" >>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>> do the right thing) and advanced things possible, an >>>>>>>>>> application >>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>> add >>>>>>>>>>>>>>>>>>>>> try + catch around flush and ignore some errors. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> AL2. I'm not sure if config is the best way to express >>>> the >>>>>>>>>>>>>>>>>>> modification >>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>> the "flush" semantics -- the application logic that calls >>>>>>>>>> "flush" >>>>>>>>>>>>>>>> needs >>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>> match the "flush" semantics and configuring semantics in >>>> a >>>>>>>>>>>>>> detached >>>>>>>>>>>>>>>>>>> place >>>>>>>>>>>>>>>>>>>>> creates a room for bugs due to discrepancies. This can >>>> be >>>>>>>>>>>>>>> especially >>>>>>>>>>>>>>>>>>> bad >>>>>>>>>>>>>>>>>>>>> if the producer loads configuration from a file at run >>>>>> time, in >>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>> case a >>>>>>>>>>>>>>>>>>>>> mistake in configuration could break the application >>>>>> because it >>>>>>>>>>>>>> was >>>>>>>>>>>>>>>>>>>> written >>>>>>>>>>>>>>>>>>>>> to expect one "flush" semantics but the semantics is >>>>>> switched. >>>>>>>>>>>>>>> Given >>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>> the "flush" semantics needs to match the caller's >>>>>> expectation, >>>>>>>>>> a >>>>>>>>>>>>>>> way >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>> accomplish that would be to pass the caller's expectation >>>>>> to >>>>>>>>>> the >>>>>>>>>>>>>>>>>>> "flush" >>>>>>>>>>>>>>>>>>>>> call by either have a method with a different name or >>>> have >>>>>> an >>>>>>>>>>>>>>>> overload >>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>> a Boolen flag that would configure the semantics (the >>>>>> current >>>>>>>>>>>>>>> method >>>>>>>>>>>>>>>>>>>> could >>>>>>>>>>>>>>>>>>>>> just redirect to the new one). >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> -Artem >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Mon, Jun 17, 2024 at 9:09 AM Alieh Saeedi >>>>>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid> >>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I'd like to kick off a discussion for KIP-1059 that >>>>>> suggests >>>>>>>>>>>>>>> adding >>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>>>> feature to the Producer flush() method. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1059%3A+Enable+the+Producer+flush%28%29+method+to+clear+the+latest+send%28%29+error >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>> Alieh