Hi all, thanks for the brilliant ideas. The KIP is updated as follows:
-Regarding the word "latest”: I chose this word because in documentation of the `commitTnx()`, it is clearly mentioned that the method throws the lates exception of `send()`. BTW, I agree with you to change it to “any” since “latest” can be confusing. The user may think, what about the pre-last exception?! -KIP name: updated to “Enable Producer to resolve send() method errors”. Agree? -Map is replaced by enum `CommitOption` as Andrew suggested. Thanks for all the suggestions. I knew Map is not the best choice but hesitated to define something new. -The method javadocs: updated Looking forward to your votes. Bests, Alieh On Wed, Jun 26, 2024 at 10:52 AM Andrew Schofield <andrew_schofi...@live.com> wrote: > Hi, > Looking at your suggestions for the CommitOptions, I would be happy with > either. > I definitely prefer to the Map<String, ?> in the KIP. > > We also need to think about the other option where CLEAR_SEND_ERRORS > hasn’t been > specified, and leave ourselves space for other options in the future. > > If we use an enum, we need to give the existing behaviour a name. Maybe: > > public enum CommitOptions { > /** > * Commits the ongoing transaction, flushing any unsent records > before > * actually committing the transaction. If any of the records sent > in this transaction > * hit unrecoverable errors, the transaction will not be committed. > */ > NONE, > > /** > * Commits the ongoing transaction, first clearing any errors from > records already sent > * in this transaction and then flushing any unsent records before > committing the transaction. > * If there are any unsent records flushed by this operation which > hit unrecoverable errors, > * these errors will not be cleared and the transaction will not be > committed. > * <p> > * To ensure there are no unsent records, you must call {@link > #flush()} before > * committing the transaction. > */ > CLEAR_SEND_ERRORS; > } > > The Javadoc for commitTransaction will have to be careful crafted. > > Thanks, > Andrew > > > > On 26 Jun 2024, at 03:23, Matthias J. Sax <mj...@apache.org> wrote: > > > >>> I was also curious about this text: > >>>> The new method clears the latest error produced by > `send(ProducerRecord)` > >>> and transits the transaction back from the error state > > > > I agree. We should not say "latest" but "any". > > > > > > > >> Is it fair to say that we expect to encounter only one send response > with > >> an error that we clear on commitTransaction? Is that solving the > problem? > > > > Yes, I think that is an accurate description. > > > > > >> Speaking of documentation, is it confusing that the name of the KIP is > no > >> longer consistent with the approach the KIP takes? > > > > Sounds like an easy fix (we should not update the subject of the discuss > email thread thought...) > > > > > > > >> looks like we said on the KIP that the options will be of the type > Map<String, > >> ?> commitOptions. Would we want to define something more specific? > > > > I would also suggest to either pass an enum (as preferred by Andrew) or > a newly define class `CommitOptions`. > > > > Something like: > > > > public enum CommitOptions { > > /** > > * Cleans any previous send errors, before committing the > transaction. > > * Note, if there are pending sends, error won't be cleared. > > * To avoid pending sends, you need to call {@link #flush()} > before committing the transaction. > > */ > > CLEAR_SEND_ERRORS > > } > > > > > > Or similar for a class: > > > > public class CommitOptions { > > /** > > * Cleans any previous send errors, before committing the > transaction. > > * Note, if there are pending sends, error won't be cleared. > > * To avoid pending sends, you need to call {@link #flush()} > before committing the transaction. > > */ > > public static CommitOptions clearSendErrors(); > > } > > > > Plus: > > > > public class KafkaProducer { > > > > /** > > * <same JavaDocs as for commitTransaction()> > > * > > * This method should only be called if there are no pending > writes, ie, only after calling {@link #flush()). > > * If there are any errors sending messaged to topics, these > errors can be cleared by passing {@link CommitOptions#clearSendErrors}, > allowing the transaction to commit even in case of data loss during. > > * > > * If this method is used while there are pending sends, > > * send error cannot be cleared. > > */ > > public void commitTransaction(CommitOptions options); > > } > > > > Or something like this -- we can discuss details about JavaDocs on the > PR IMHO. > > > > > >> What I > >> really don't like (in any of the options), is that we cannot really > >> document it in a way that articulates a value in the product. There are > >> tons of nuances that require understanding some buggy behavior, then > fixed > >> behavior, then an option to sometimes turn on buggy behavior, and etc. > > > > I would believe that users don't need to understand the full history of > events, and the propose JavaDocs should go a long way. > > > > > > > > -Matthias > > > > > > On 6/25/24 5:00 PM, Justine Olshan wrote: > >> Hey there, > >> I had a few questions about the update. > >> Looks like we said on the KIP that the options will be of the type > Map<String, > >> ?> commitOptions. Would we want to define something more specific? > >> I was also curious about this text: > >>> The new method clears the latest error produced by > `send(ProducerRecord)` > >> and transits the transaction back from the error state > >> Is it fair to say that we expect to encounter only one send response > with > >> an error that we clear on commitTransaction? Is that solving the > problem? > >> I think we also need to be really careful about the documentation of > this > >> method. It should be clear that setting this option will not do > anything if > >> there is any inflight record when the method is called. > >> Speaking of documentation, is it confusing that the name of the KIP is > no > >> longer consistent with the approach the KIP takes? > >> Thanks, > >> Justine > >> On Tue, Jun 25, 2024 at 5:08 AM Alieh Saeedi > <asae...@confluent.io.invalid> > >> wrote: > >>> Hi all, > >>> > >>> Appreciation for maintaining the momentum of our discussion. > >>> > >>> > >>> I see kinda consensus over the main points. It seems that we agreed on > the > >>> following: > >>> > >>> 1) Define the `commitTnx(commitOptions)` to clear the error. > >>> > >>> 2) Make the user explicitly call `flush()` before > >>> `commitTnx(commitOptions)`, if he determines ignoring errors. > >>> > >>> > >>> I updated the KIP with the above-mentioned points. Please take a look. > I am > >>> sure it is not perfect yet, and there are/will be some open questions, > but > >>> if you agree, I will open voting as well. Of course, the discussion can > >>> still carry on in this thread. > >>> > >>> > >>> Cheers, > >>> > >>> Alieh > >>> > >>> On Tue, Jun 25, 2024 at 11:36 AM Chris Egerton < > fearthecel...@gmail.com> > >>> wrote: > >>> > >>>> Hi Artem, > >>>> > >>>> Yes, I completely agree that by default, special action shouldn't be > >>>> required from users to prevent transactions from being committed when > one > >>>> or more records can't be sent. The behavior I was suggesting was only > >>>> relevant to the new API we're discussing where we allow users to > >>>> intentionally bypass that logic when invoking commitTransaction. > >>>> > >>>> Cheers, > >>>> > >>>> Chris > >>>> > >>>> On Tue, Jun 25, 2024, 01:44 Artem Livshits <alivsh...@confluent.io > >>>> .invalid> > >>>> wrote: > >>>> > >>>>> Hey folks, > >>>>> > >>>>> Great discussion! > >>>>> > >>>>> Re: throwing exceptions from send(). send() is documented to throw > >>>>> KafkaException, so if the application doesn't handle it, it should > be a > >>>>> bug. Now, it does have a note that API exceptions wouldn't be > thrown, > >>>> not > >>>>> sure if we have code that relies on that. There is a reason > exceptions > >>>>> have classes, they are designed to express a "class of errors" that > can > >>>> be > >>>>> handled, so that we don't have to add a flag or a new method every > time > >>>> we > >>>>> have a new exception to throw. But if there is consensus that it's > >>> still > >>>>> too risky (especially if we have examples of code that gets broken), > >>>> then I > >>>>> agree that we shouldn't do it. > >>>>> > >>>>> Re: various ways to communicate semantics change. If we must have 2 > >>>>> different behaviors, I think passing options to "ignore errors" to > >>>>> commitTransaction is probably the most intuitive way to do it. What > I > >>>>> really don't like (in any of the options), is that we cannot really > >>>>> document it in a way that articulates a value in the product. There > >>> are > >>>>> tons of nuances that require understanding some buggy behavior, then > >>>> fixed > >>>>> behavior, then an option to sometimes turn on buggy behavior, and > etc. > >>>>> > >>>>>> if a user invokes Producer::abortTransaction from within a producer > >>>>> callback today > >>>>> > >>>>> I think we would get invalid state exception. Which we could > probably > >>>> fix, > >>>>> but even if we supported it, I think it would be good if doing send + > >>>>> commit would lead to aborted transaction without special action from > >>> the > >>>>> application -- the simple things should be really simple, any failure > >>>>> during send or commit should abort send + commit sequence without > >>> special > >>>>> handling. > >>>>> > >>>>> -Artem > >>>>> > >>>>> On Mon, Jun 24, 2024 at 6:37 PM Chris Egerton < > fearthecel...@gmail.com > >>>> > >>>>> wrote: > >>>>> > >>>>>> One quick thought: if a user invokes Producer::abortTransaction from > >>>>> within > >>>>>> a producer callback today, even in the midst of an ongoing call to > >>>>>> Producer::commitTransaction, what is the behavior? Would it be > >>>> reasonable > >>>>>> to support this behavior as a way to allow error handling to take > >>> place > >>>>>> during implicit flushes, via producer callback? > >>>>>> > >>>>>> On Mon, Jun 24, 2024 at 9:21 PM Matthias J. Sax <mj...@apache.org> > >>>>> wrote: > >>>>>> > >>>>>>> My point it, that it does not seem to be safe to allow users to > >>>> ignore > >>>>>>> errors with an implicit flush, and I think it's better to only > >>> allow > >>>> it > >>>>>>> with (ie, after) an explicit flush(). > >>>>>>> > >>>>>>> My reasoning is, that users should make a decision to ignore errors > >>>> or > >>>>>>> not, before calling `commitTx()`, but after inspecting all > >>> potential > >>>>>>> send errors. With an implicit flush, users need to "blindly" decide > >>>> to > >>>>>>> ignore send errors, because there are pending sends and potential > >>>>> errors > >>>>>>> are not known yet, when calling `commitTx()`. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>>> In the documentation of commitTransaction, we say if any send > >>>> throws > >>>>> an > >>>>>>>> error, commitTransaction will too. > >>>>>>> > >>>>>>> Yes. And I think we should keep it this way for an implicit flush. > >>>> With > >>>>>>> an explicit flush, `commitTransaction()` cannot encounter any send > >>>>>>> errors any longer. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>>> It says that all callbacks will be executed, but we ignore the > >>>> errors > >>>>>> of > >>>>>>>> the callbacks. > >>>>>>> > >>>>>>> Ah. Thanks for pointing this out. For this case it's even worse > >>> (for > >>>>>>> case (2)), because the user cannot inspect any errors and make any > >>>>>>> decision to ignore or not during an implicit flush... > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>>> We shouldn't be relying on errors in the callback unless we are > >>>>>>>> calling flush, which we can still do. It seems this has always > >>> been > >>>>> the > >>>>>>>> case as well. > >>>>>>> > >>>>>>> Yes, has always been this way, and my point is to keep it this way > >>>>>>> (option (2) would change it), and not start to allow to ignore > >>> errors > >>>>>>> with an implicit flush. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -Matthias > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On 6/24/24 4:57 PM, Justine Olshan wrote: > >>>>>>>> Transaction verification is a concept from KIP-890 referring to > >>> the > >>>>>>>> verification that a partition has been added to the transaction. > >>>> It's > >>>>>>> not a > >>>>>>>> huge deal, but maybe we don't want to overload the terminology. > >>>>>>>> > >>>>>>>> For option 2, I was a little confused by this > >>>>>>>> > >>>>>>>>> when commitTx is called, there is still pending Futures and > >>> not > >>>>>>>> all Callbacks are executed yet -- with the implicit flush, we > >>> know > >>>>> that > >>>>>>>> all Callbacks are executed, but even for this case, the user > >>> could > >>>>> only > >>>>>>>> throw an exception inside the Callback to stop the TX to > >>> eventually > >>>>>>>> commit -- Futures cannot be used to make a decision to ignore > >>> error > >>>>> and > >>>>>>>> commit or not. > >>>>>>>> > >>>>>>>> In the documentation of commitTransaction, we say if any send > >>>> throws > >>>>> an > >>>>>>>> error, commitTransaction will too. > >>>>>>>> > >>>>>>>> *Further, if any of the {@link #send(ProducerRecord)} calls which > >>>>> were > >>>>>>> part > >>>>>>>> of the transaction hit irrecoverable errors, this method will > >>> throw > >>>>> the > >>>>>>>> last received exception immediately and the transaction will not > >>> be > >>>>>>>> committed.* > >>>>>>>> > >>>>>>>> It says that all callbacks will be executed, but we ignore the > >>>> errors > >>>>>> of > >>>>>>>> the callbacks. > >>>>>>>> > >>>>>>>> *If the transaction is committed successfully and this method > >>>> returns > >>>>>>>> without throwing an exception, it is guaranteed that all {@link > >>>>>> Callback > >>>>>>>> callbacks} for records in the transaction will have been invoked > >>>> and > >>>>>>>> completed. Note that exceptions thrown by callbacks are ignored; > >>>> the > >>>>>>>> producer proceeds to commit the transaction in any case.* > >>>>>>>> > >>>>>>>> Is it fair to say though that for the send errors, we can choose > >>> to > >>>>>>> ignore > >>>>>>>> them? II wasn't understanding where the callbacks come in with > >>> your > >>>>>>>> comment. We shouldn't be relying on errors in the callback unless > >>>> we > >>>>>> are > >>>>>>>> calling flush, which we can still do. It seems this has always > >>> been > >>>>> the > >>>>>>>> case as well. > >>>>>>>> > >>>>>>>> Justine > >>>>>>>> > >>>>>>>> On Mon, Jun 24, 2024 at 11:07 AM Andrew Schofield < > >>>>>>> andrew_schofi...@live.com> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Agreed. Options 1 and 3 are safe. Option 2 is not. I’d be happy > >>>> with > >>>>>> 3a > >>>>>>> as > >>>>>>>>> the way. > >>>>>>>>> > >>>>>>>>> I suggest “TRANSACTION VERIFIED”. > >>>>>>>>> > >>>>>>>>> There isn’t really precedent for options in the producer API. We > >>>>> could > >>>>>>> use > >>>>>>>>> an enum, > >>>>>>>>> which is easy to use and not very future-proof. Or we could use > >>> a > >>>>>> class > >>>>>>>>> like the > >>>>>>>>> admin API does, which is cumbersome and flexible. > >>>>>>>>> > >>>>>>>>> CommitTransactionOptions.TRANSACTION_VERIFIED > >>>>>>>>> > >>>>>>>>> or > >>>>>>>>> > >>>>>>>>> public class CommitTransactionOptions { > >>>>>>>>> public CommitTransactionOptions(); > >>>>>>>>> > >>>>>>>>> CommitTransactionOptions transactionVerified(boolean > >>>>>>>>> transactionVerified); > >>>>>>>>> > >>>>>>>>> boolean transactionVerified(); > >>>>>>>>> } > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Then 3b is: > >>>>>>>>> > >>>>>>>>> send(…) > >>>>>>>>> send(…) > >>>>>>>>> flush() > >>>>>>>>> commitTransaction(new > >>>>>>>>> CommitTransactionOptions().transactionVerified(true)) > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> I’d tend towards the enum here because I doubt we need as much > >>>>>>> flexibility > >>>>>>>>> as the admin API requires. > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> Andrew > >>>>>>>>> > >>>>>>>>> > >>>>>>>>>> On 24 Jun 2024, at 18:39, Matthias J. Sax <mj...@apache.org> > >>>>> wrote: > >>>>>>>>>> > >>>>>>>>>> I am ok either way (ie, flush or commit), but I think we need > >>> to > >>>>>> define > >>>>>>>>> exact semantics, and I think there is some subtle thing to > >>>> consider: > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> 1) flush(Options) > >>>>>>>>>> > >>>>>>>>>> Example: > >>>>>>>>>> > >>>>>>>>>> send(...) > >>>>>>>>>> send(...) > >>>>>>>>>> > >>>>>>>>>> flush(ignoreErrors) > >>>>>>>>>> > >>>>>>>>>> // at this point, we know that all Futures are completed and > >>>> all > >>>>>>>>> Callbacks are executed, and we can assume that all user code > >>>>> checking > >>>>>>> for > >>>>>>>>> errors did execute, before `commitTx` is called > >>>>>>>>>> > >>>>>>>>>> // I consider this option as safe > >>>>>>>>>> > >>>>>>>>>> commitTx() > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> 2) commitTx(Option) > >>>>>>>>>> > >>>>>>>>>> Example: > >>>>>>>>>> > >>>>>>>>>> send(...) > >>>>>>>>>> send(...) > >>>>>>>>>> > >>>>>>>>>> // when commitTx is called, there is still pending Futures > >>> and > >>>>> not > >>>>>>> all > >>>>>>>>> Callbacks are executed yet -- with the implicit flush, we know > >>>> that > >>>>>> all > >>>>>>>>> Callbacks are executed, but even for this case, the user could > >>>> only > >>>>>>> throw > >>>>>>>>> an exception inside the Callback to stop the TX to eventually > >>>> commit > >>>>>> -- > >>>>>>>>> Futures cannot be used to make a decision to ignore error and > >>>> commit > >>>>>> or > >>>>>>> not. > >>>>>>>>>> > >>>>>>>>>> // I consider this option not as safe > >>>>>>>>>> > >>>>>>>>>> commitTx(igrnoreErrors) > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> 3a) required flush + commitTx(Option) > >>>>>>>>>> > >>>>>>>>>> Example: > >>>>>>>>>> > >>>>>>>>>> send(...) > >>>>>>>>>> send(...) > >>>>>>>>>> > >>>>>>>>>> flush() > >>>>>>>>>> > >>>>>>>>>> // at this point, we know that all Future are completed and > >>> all > >>>>>>>>> Callbacks are executed, and we can assume that all user code > >>>>> checking > >>>>>>> for > >>>>>>>>> error did execute, before `commitTx` is called > >>>>>>>>>> > >>>>>>>>>> // I consider this option as safe > >>>>>>>>>> > >>>>>>>>>> commitTx(ignoreErrors) > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> 3b) missing flush + commitTx(Option) > >>>>>>>>>> > >>>>>>>>>> Example: > >>>>>>>>>> > >>>>>>>>>> send(...) > >>>>>>>>>> send(...) > >>>>>>>>>> > >>>>>>>>>> // as flush() was not called explicitly, we should ignore > >>>>>>>>> `ignoreErrors` flag and always throw an exception if the > >>> producer > >>>> is > >>>>>> in > >>>>>>>>> error state, because we cannot be sure that the user did all > >>>>> required > >>>>>>> check > >>>>>>>>> for error handling > >>>>>>>>>> > >>>>>>>>>> commitTx(ignoreErrors) > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> The only issue with option (3) is, that it's more complex and > >>>>>> semantics > >>>>>>>>> are more subtle. But it might be the a good (necessary?) bridge > >>>>>> between > >>>>>>> (1) > >>>>>>>>> and (2): (3) is semantically sound (we ignore errors via > >>> passing a > >>>>>> flag > >>>>>>>>> into commitTx() instead of flush()), and at the same time safe > >>> (we > >>>>>> force > >>>>>>>>> users to explicitly flush() and [hopefully] do proper error > >>>>> handling, > >>>>>>> and > >>>>>>>>> don't rely in am implicit flush() during commitTx() which might > >>> be > >>>>>> error > >>>>>>>>> prone). > >>>>>>>>>> > >>>>>>>>>> (Also need to find a good and descriptive name for the flag we > >>>> pass > >>>>>>> into > >>>>>>>>> `commitTx()` for this case.) > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -Matthias > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On 6/24/24 8:51 AM, Andrew Schofield wrote: > >>>>>>>>>>> Hi Chris, > >>>>>>>>>>> That works for me too. I slightly prefer an option on flush(), > >>>> but > >>>>>>> what > >>>>>>>>> you suggested > >>>>>>>>>>> works too. > >>>>>>>>>>> Thanks, > >>>>>>>>>>> Andrew > >>>>>>>>>>>> On 24 Jun 2024, at 15:14, Chris Egerton > >>>> <chr...@aiven.io.INVALID > >>>>>> > >>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> Hi Andrew, > >>>>>>>>>>>> > >>>>>>>>>>>> I like a lot of what you said, but I still believe it's > >>> better > >>>> to > >>>>>>>>> override > >>>>>>>>>>>> commitTransaction than flush. Users will already have to > >>>> manually > >>>>>> opt > >>>>>>>>> in to > >>>>>>>>>>>> ignoring errors encountered during transactions, and we can > >>>>>> document > >>>>>>>>>>>> recommended usage (i.e., explicitly invoking flush() before > >>>>>> invoking > >>>>>>>>>>>> commitTransaction(ignoreRecordErrors)) in the > >>> newly-introduced > >>>>>>> method. > >>>>>>>>> I > >>>>>>>>>>>> don't believe it's worth the increased cognitive load on > >>> users > >>>>> with > >>>>>>>>>>>> non-transactional producers to introduce an overloaded > >>> flush() > >>>>>>> variant. > >>>>>>>>>>>> > >>>>>>>>>>>> Cheers, > >>>>>>>>>>>> > >>>>>>>>>>>> Chris > >>>>>>>>>>>> > >>>>>>>>>>>> On Mon, Jun 24, 2024 at 9:39 AM Andrew Schofield < > >>>>>>>>> andrew_schofi...@live.com> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi Alieh, > >>>>>>>>>>>>> Thanks for driving this. Unfortunately, there are many parts > >>>> of > >>>>>> the > >>>>>>>>> API > >>>>>>>>>>>>> which > >>>>>>>>>>>>> are a bit unfortunate and it’s tricky to make small > >>>> improvements > >>>>>>> that > >>>>>>>>>>>>> don’t have > >>>>>>>>>>>>> downsides. > >>>>>>>>>>>>> > >>>>>>>>>>>>> I don’t like the idea of using a configuration because > >>>>>> configuration > >>>>>>>>> is > >>>>>>>>>>>>> often > >>>>>>>>>>>>> outside the application and changing the behaviour of > >>> someone > >>>>>> else’s > >>>>>>>>>>>>> application > >>>>>>>>>>>>> without understanding it is risky. Anything which embeds a > >>>>>>>>> transactional > >>>>>>>>>>>>> producer > >>>>>>>>>>>>> could have its behaviour changed unexpectedly. > >>>>>>>>>>>>> > >>>>>>>>>>>>> It would be been much nicer if send() didn’t fail silently > >>> and > >>>>>>> change > >>>>>>>>> the > >>>>>>>>>>>>> transaction > >>>>>>>>>>>>> state. But, because it’s an asynchronous operation, I don’t > >>>>> really > >>>>>>>>> think > >>>>>>>>>>>>> we can > >>>>>>>>>>>>> just make it throw all exceptions, even though I really > >>> think > >>>>> that > >>>>>>>>>>>>> `send()` is the > >>>>>>>>>>>>> method with the problem here. > >>>>>>>>>>>>> > >>>>>>>>>>>>> The contract of `flush()` is that it makes sure that all > >>>>> preceding > >>>>>>>>> sends > >>>>>>>>>>>>> will have > >>>>>>>>>>>>> completed, so it should be true that a well written > >>>> application > >>>>>>> would > >>>>>>>>> be > >>>>>>>>>>>>> able to > >>>>>>>>>>>>> know which records were OK because of the > >>>> Future<RecordMetadata> > >>>>>>>>> returned > >>>>>>>>>>>>> by the `send()` method. It should be able to determine > >>> whether > >>>>> it > >>>>>>>>> wants to > >>>>>>>>>>>>> commit > >>>>>>>>>>>>> the transaction even if some of the intended operations > >>> didn’t > >>>>>>>>> succeed. > >>>>>>>>>>>>> > >>>>>>>>>>>>> What we don’t currently have is a way for the application to > >>>> say > >>>>>> to > >>>>>>>>> the > >>>>>>>>>>>>> KafkaProducer > >>>>>>>>>>>>> that it knows the outcome of sending the records and to > >>>> confirm > >>>>>> that > >>>>>>>>> it > >>>>>>>>>>>>> wants to proceed. > >>>>>>>>>>>>> Then it would not be necessary for `commitTransaction()` to > >>>>> throw > >>>>>> an > >>>>>>>>>>>>> exception to > >>>>>>>>>>>>> report a historical error which the application might choose > >>>> to > >>>>>>>>> ignore. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Having read the comments, I think the KIP is on the right > >>>> lines > >>>>>>>>> focusing > >>>>>>>>>>>>> on the `flush()` > >>>>>>>>>>>>> method. My suggestion is that we introduce an option on > >>>>> `flush()` > >>>>>> to > >>>>>>>>> be > >>>>>>>>>>>>> used before > >>>>>>>>>>>>> `commitTransaction()` for applications that want to be able > >>> to > >>>>>>> commit > >>>>>>>>>>>>> transactions which > >>>>>>>>>>>>> had known failed operations. > >>>>>>>>>>>>> > >>>>>>>>>>>>> The code would be: > >>>>>>>>>>>>> > >>>>>>>>>>>>> producer.beginTransaction(); > >>>>>>>>>>>>> > >>>>>>>>>>>>> future1 = producer.send(goodRecord1); > >>>>>>>>>>>>> future2 = producer.send(badRecord); // The future from > >>> this > >>>>>> call > >>>>>>>>> will > >>>>>>>>>>>>> complete exceptionally > >>>>>>>>>>>>> future3 = producer.send(goodRecord2); > >>>>>>>>>>>>> > >>>>>>>>>>>>> producer.flush(FlushOption.TRANSACTION_READY); > >>>>>>>>>>>>> > >>>>>>>>>>>>> // At this point, we know that all 3 futures are complete > >>>> and > >>>>>> the > >>>>>>>>>>>>> transaction contains 2 records > >>>>>>>>>>>>> producer.commitTransaction(); > >>>>>>>>>>>>> > >>>>>>>>>>>>> I wouldn’t deprecate `flush()` with no option. It just uses > >>>> the > >>>>>>>>> default > >>>>>>>>>>>>> option which behaves > >>>>>>>>>>>>> like today. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Why did I suggest an option on `flush()` rather than > >>>>>>>>>>>>> `commitTransaction()`? Because with > >>>>>>>>>>>>> `flush()`, it’s clear when the application is stating that > >>>> it’s > >>>>>> seen > >>>>>>>>> all > >>>>>>>>>>>>> of the results from its > >>>>>>>>>>>>> `send()` calls and it’s ready to proceed. If it has to rely > >>> on > >>>>>>>>> flushing > >>>>>>>>>>>>> that occurs inside > >>>>>>>>>>>>> `commitTransaction()`, I don’t see it’s as clear-cut. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>> Andrew > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>>> On 24 Jun 2024, at 13:44, Alieh Saeedi > >>>>>>> <asae...@confluent.io.INVALID > >>>>>>>>>> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>> Thanks for the interesting discussion. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I assume that now the main questions are as follows: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1. Do we need to transit the transcation to the error state > >>>> for > >>>>>> API > >>>>>>>>>>>>>> exceptions? > >>>>>>>>>>>>>> 2. Should we throw the API exception in `send()` instead of > >>>>>>>>> returning a > >>>>>>>>>>>>>> future error? > >>>>>>>>>>>>>> 3. If the answer to question (1) is NO and to question (2) > >>> is > >>>>>> YES, > >>>>>>>>> do we > >>>>>>>>>>>>>> need to change the current `flush` or `commitTnx` at all? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>> Alieh > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Sat, Jun 22, 2024 at 3:21 AM Matthias J. Sax < > >>>>>> mj...@apache.org> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hey Kirk, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> can you elaborate on a few points? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Otherwise users would have to know to explicitly change > >>>> their > >>>>>>> code > >>>>>>>>> to > >>>>>>>>>>>>>>> invoke flush(). > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Why? If we would add an option to `flush(FlushOption)`, > >>> the > >>>>>>> existing > >>>>>>>>>>>>>>> `flush()` w/o any option will still be there, right? If we > >>>>> would > >>>>>>>>> really > >>>>>>>>>>>>>>> deprecate existing `flush()`, it would just mean that we > >>>> would > >>>>>>> pass > >>>>>>>>>>>>>>> "default FlushOption" into an implicit flush (and yes, we > >>>>> would > >>>>>>>>> need to > >>>>>>>>>>>>>>> define what this would be). > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I think there is no clear winner (as pointed out in my > >>> last > >>>>>>> reply), > >>>>>>>>> and > >>>>>>>>>>>>>>> both `flush(FlushOption)` and `commitTx(CommitOption)` has > >>>>>>>>> advantages > >>>>>>>>>>>>>>> and drawbacks. Guess we need to just agree on which > >>> tradeoff > >>>>> we > >>>>>>>>> want to > >>>>>>>>>>>>>>> move forward with? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Not sure if your database example is a 1:1 fit? I think, > >>> the > >>>>>>> better > >>>>>>>>>>>>>>> comparison would be: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> BEGIN TX; > >>>>>>>>>>>>>>> INSERT INTO foo VALUES (’a’); > >>>>>>>>>>>>>>> INSERT INTO foo VALUES (’b’); > >>>>>>>>>>>>>>> INSERT INTO foo VALUES (’c’); > >>>>>>>>>>>>>>> INSERT INTO foo VALUES (’not sure’); > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> For this case, the full TX would roll back, right? I still > >>>>> think > >>>>>>>>> that > >>>>>>>>>>>>>>> allowing users to just skip over the last error, and > >>>> continue > >>>>>> the > >>>>>>> TX > >>>>>>>>>>>>>>> would be ok. In the end, we provide a programmatic API, > >>> and > >>>>> not > >>>>>> a > >>>>>>>>>>>>>>> declarative one as SQL. Of course, default behavior would > >>>>> still > >>>>>> be > >>>>>>>>> to > >>>>>>>>>>>>>>> put the producer into error state, and the user would need > >>>> to > >>>>>> call > >>>>>>>>>>>>>>> `abortTransaction()` to move forward. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On 6/21/24 5:26 PM, Kirk True wrote: > >>>>>>>>>>>>>>>> Hi Matthias, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Jun 21, 2024, at 12:28 PM, Matthias J. Sax < > >>>>>> mj...@apache.org > >>>>>>>> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> If we want to limit it to `RecordTooLargeException` > >>>> throwing > >>>>>>> from > >>>>>>>>>>>>>>> `send()` directly make sense. Thanks for calling it out. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> It's still a question of backward compatibility? > >>> `send()` > >>>>> does > >>>>>>>>> throw > >>>>>>>>>>>>>>> exceptions already, including generic `KafkaException`. > >>> Not > >>>>> sure > >>>>>>> if > >>>>>>>>> this > >>>>>>>>>>>>>>> helps with backward compatibility? Could we just add a new > >>>>>>> exception > >>>>>>>>>>>>> type > >>>>>>>>>>>>>>> (which is a child of `KafkaException`)? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> The Producer JavaDocs are not totally explicit about it > >>>>> IMHO. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I think we could expect that some generic error handling > >>>>> path > >>>>>>> gets > >>>>>>>>>>>>>>> executed. For the TX-case, I would assume that a TX would > >>> be > >>>>>>>>> aborted if > >>>>>>>>>>>>>>> `send()` throws or that the producer would be `closed()`. > >>>>>> Overall > >>>>>>>>> this > >>>>>>>>>>>>>>> might be safe? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> It would be a little less flexible > >>>>>>>>>>>>>>>>>>> though, since (as you note) it would still be > >>> impossible > >>>>> to > >>>>>>>>> commit > >>>>>>>>>>>>>>>>>>> transactions after errors have been reported from > >>>> brokers. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> KS would still need a way to clear the error state of > >>> the > >>>>>>>>> producer. We > >>>>>>>>>>>>>>> could catch a `RecordTooLargeException` from `send()`, > >>> call > >>>>> the > >>>>>>>>> handler > >>>>>>>>>>>>> and > >>>>>>>>>>>>>>> let it decide what to do next. But if it does return > >>>>> `CONTINUE` > >>>>>> to > >>>>>>>>>>>>> swallow > >>>>>>>>>>>>>>> the error and drop the poison pill record on the floor, we > >>>>> would > >>>>>>>>> want to > >>>>>>>>>>>>>>> move forward and commit the transaction. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> But the question is: if we cannot add a record to the > >>> tx, > >>>>> does > >>>>>>> the > >>>>>>>>>>>>>>> producer need to go into error state? In the end, we did > >>>> throw > >>>>>> and > >>>>>>>>>>>>> inform > >>>>>>>>>>>>>>> the app that the record was _not_ added, and it's up to > >>> the > >>>>> app > >>>>>> to > >>>>>>>>>>>>> decide > >>>>>>>>>>>>>>> what to do next? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> That’s an excellent question… > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Imagine the user’s application is writing information to > >>> a > >>>>>>> database > >>>>>>>>>>>>>>> instead of Kafka. If there’s a table with a CHAR(1) column > >>>> and > >>>>>>> this > >>>>>>>>> SQL > >>>>>>>>>>>>>>> statement was attempted, what should happen? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> INSERT INTO foo VALUES (’not sure’); > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Yes, that DML would fail, sure, but would the user expect > >>>>> that > >>>>>>> the > >>>>>>>>>>>>>>> connection used by database library would get stuck in > >>> some > >>>>> kind > >>>>>>> of > >>>>>>>>>>>>> error > >>>>>>>>>>>>>>> state? A user would be able catch the error and either > >>>>> continue > >>>>>> or > >>>>>>>>>>>>> abort, > >>>>>>>>>>>>>>> based on their business rules. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> So I agree with what I believe you’re implying: we > >>>> shouldn’t > >>>>>>>>> poison the > >>>>>>>>>>>>>>> Producer/TransactionManager on certain types of > >>>>>> application-level > >>>>>>>>>>>>> errors in > >>>>>>>>>>>>>>> send(). > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Kirk > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> If we report the error only via the `Callback` it's a > >>>>>> different > >>>>>>>>> story, > >>>>>>>>>>>>>>> because the contract for this case is clearly specified on > >>>> the > >>>>>>>>> JavaDocs: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> When used as part of a transaction, it is not necessary > >>>> to > >>>>>>>>> define a > >>>>>>>>>>>>>>> callback or check the result of the future > >>>>>>>>>>>>>>>>>> in order to detect errors from <code>send</code>. If > >>> any > >>>> of > >>>>>> the > >>>>>>>>> send > >>>>>>>>>>>>>>> calls failed with an irrecoverable error, > >>>>>>>>>>>>>>>>>> the final {@link #commitTransaction()} call will fail > >>> and > >>>>>> throw > >>>>>>>>> the > >>>>>>>>>>>>>>> exception from the last failed send. When > >>>>>>>>>>>>>>>>>> this happens, your application should call {@link > >>>>>>>>>>>>> #abortTransaction()} > >>>>>>>>>>>>>>> to reset the state and continue to send > >>>>>>>>>>>>>>>>>> data. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On 6/21/24 11:42 AM, Chris Egerton wrote: > >>>>>>>>>>>>>>>>>> Hi Artem, > >>>>>>>>>>>>>>>>>> I think it'd make sense to throw directly from send > >>>>> whenever > >>>>>>>>>>>>> possible, > >>>>>>>>>>>>>>>>>> instead of returning an already-completed future. I > >>>> didn't > >>>>> do > >>>>>>>>> that in > >>>>>>>>>>>>>>> my > >>>>>>>>>>>>>>>>>> bug fix to try to be conservative about breaking > >>> changes > >>>>> but > >>>>>>> this > >>>>>>>>>>>>>>> seems to > >>>>>>>>>>>>>>>>>> have caused its own set of headaches. It would be a > >>>> little > >>>>>> less > >>>>>>>>>>>>>>> flexible > >>>>>>>>>>>>>>>>>> though, since (as you note) it would still be > >>> impossible > >>>> to > >>>>>>>>> commit > >>>>>>>>>>>>>>>>>> transactions after errors have been reported from > >>>> brokers. > >>>>>>>>>>>>>>>>>> I'll leave it up to the Kafka Streams folks to decide > >>> if > >>>>> that > >>>>>>>>>>>>>>> flexibility > >>>>>>>>>>>>>>>>>> is required. If it is, then users could explicitly call > >>>>>> flush() > >>>>>>>>>>>>> before > >>>>>>>>>>>>>>>>>> committing (and ignoring errors for) or aborting a > >>>>>> transaction, > >>>>>>>>> if > >>>>>>>>>>>>> they > >>>>>>>>>>>>>>>>>> want to implement fine-grained error handling logic > >>> such > >>>> as > >>>>>>>>> allowing > >>>>>>>>>>>>>>> errors > >>>>>>>>>>>>>>>>>> for a subset of topics to be ignored. > >>>>>>>>>>>>>>>>>> Hi Matthias, > >>>>>>>>>>>>>>>>>> Most of the time you're right and we can't throw from > >>>>> send(); > >>>>>>>>>>>>> however, > >>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>> this case (client-side record-too-large exception), the > >>>>> error > >>>>>>> is > >>>>>>>>>>>>>>> actually > >>>>>>>>>>>>>>>>>> noticed by the producer before send() returns, so it > >>>> should > >>>>>> be > >>>>>>>>>>>>>>> possible to > >>>>>>>>>>>>>>>>>> throw directly. > >>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>> Chris > >>>>>>>>>>>>>>>>>> On Fri, Jun 21, 2024, 14:25 Matthias J. Sax < > >>>>>> mj...@apache.org> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> Not sure if we can change send and make it throw, > >>> given > >>>>> that > >>>>>>>>> send() > >>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>> async? That is why users can register a `Callback` to > >>>>> begin > >>>>>>>>> with, > >>>>>>>>>>>>>>> right? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> And Alieh's point about backward compatibility is > >>> also a > >>>>>> fair > >>>>>>>>>>>>> concern. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Actually, this would potentially be even > >>>>>>>>>>>>>>>>>>>> worse than the original buggy behavior because the > >>> bug > >>>>> was > >>>>>>>>> that we > >>>>>>>>>>>>>>>>>>> ignored > >>>>>>>>>>>>>>>>>>>> errors that happened in the "send()" method itself, > >>> not > >>>>>>>>> necessarily > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> ones that we got from the broker. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> My understanding was that `commitTx(swallowError)` > >>> would > >>>>>> only > >>>>>>>>>>>>> swallow > >>>>>>>>>>>>>>>>>>> `send()` errors, not errors about the actually > >>> commit. I > >>>>>> agree > >>>>>>>>> that > >>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>> would be very bad to swallow errors about the actual > >>> tx > >>>>>>>>> commit... > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> It's a fair question if this might be too subtle; to > >>>> make > >>>>> it > >>>>>>>>>>>>> explicit, > >>>>>>>>>>>>>>>>>>> we could use `CommitOpions#ignorePendingSendErors()` > >>>>>> [working > >>>>>>>>> name] > >>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> make it clear. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> If we think it's too subtle to change commit to > >>> swallow > >>>>>> send() > >>>>>>>>>>>>> errors, > >>>>>>>>>>>>>>>>>>> maybe going with `flush(FlushOptions)` would be > >>> clearer > >>>>> (and > >>>>>>> we > >>>>>>>>> can > >>>>>>>>>>>>>>> use > >>>>>>>>>>>>>>>>>>> `FlushOption#swallowSendErrorsForTransactions()` > >>>> [working > >>>>>>> name] > >>>>>>>>> to > >>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>> explicitly that the `FlushOption` for now has only an > >>>>> effect > >>>>>>> for > >>>>>>>>>>>>> TX). > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thoughts? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On 6/21/24 4:10 AM, Alieh Saeedi wrote: > >>>>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> It is very exciting to see all the experts here > >>> raising > >>>>>> very > >>>>>>>>> good > >>>>>>>>>>>>>>> points. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> As we go further, we see more and more options to > >>>> improve > >>>>>> our > >>>>>>>>>>>>>>> solution, > >>>>>>>>>>>>>>>>>>>> which makes concluding and updating the KIP > >>> impossible. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> The main suggestions so far are: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 1. `flush` with `flushOptions` as input parameter > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 2. `commitTx` with `commitOptions` as input parameter > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 3. `send` must throw the exception > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> My concern about the 3rd suggestion: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 1. Does the change cause any issue with backward > >>>>>>> compatibility? > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 2. The `send (bad record)` already transits the > >>>>> transaction > >>>>>>> to > >>>>>>>>> the > >>>>>>>>>>>>>>> error > >>>>>>>>>>>>>>>>>>>> state. No user, including Streams is able to transit > >>>> the > >>>>>>>>>>>>> transaction > >>>>>>>>>>>>>>> back > >>>>>>>>>>>>>>>>>>>> from the error state. Do you mean we remove the > >>>>>>>>>>>>>>>>>>>> `maybeTransitionToErrorState(e)` from here > >>>>>>>>>>>>>>>>>>>> < > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > https://github.com/apache/kafka/blob/9b5b434e2a6b2d5290ea403fc02859b1c523d8aa/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1112 > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> as well? > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>>>> Alieh > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On Fri, Jun 21, 2024 at 8:45 AM Andrew Schofield < > >>>>>>>>>>>>>>>>>>> andrew_schofi...@live.com> > >>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Hi Artem, > >>>>>>>>>>>>>>>>>>>>> I think you make a good point which is worth further > >>>>>>>>>>>>> consideration. > >>>>>>>>>>>>>>> If > >>>>>>>>>>>>>>>>>>>>> any of the existing methods is really ripe for a > >>>> change > >>>>>>> here, > >>>>>>>>> it’s > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> send() that actually caused the problem. If that can > >>>> be > >>>>>>> fixed > >>>>>>>>> so > >>>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>>>> no situations in which a lurking error breaks a > >>>>>> transaction, > >>>>>>>>> that > >>>>>>>>>>>>>>> might > >>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>> the best. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>> Andrew > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> On 21 Jun 2024, at 01:51, Artem Livshits < > >>>>>>>>> alivsh...@confluent.io > >>>>>>>>>>>>>>>>>>> .INVALID> > >>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I thought we still wait for requests (and their > >>>>> errors) > >>>>>> to > >>>>>>>>> come > >>>>>>>>>>>>>>> in and > >>>>>>>>>>>>>>>>>>>>>> could handle fatal errors appropriately. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> We do wait for requests, but my understanding is > >>> that > >>>>>> when > >>>>>>>>>>>>>>>>>>>>>> commitTransaction("ignore send errors") we want to > >>>>> ignore > >>>>>>>>> errors. > >>>>>>>>>>>>>>> So > >>>>>>>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>> do > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 1. send > >>>>>>>>>>>>>>>>>>>>>> 2. commitTransaction("ignore send errors") > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> the commit will succeed. You can look at the > >>> example > >>>>> in > >>>>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 > >>> and > >>>>>> just > >>>>>>>>>>>>>>> substitute > >>>>>>>>>>>>>>>>>>>>>> commitTransaction with commitTransaction("ignore > >>> send > >>>>>>>>> errors") > >>>>>>>>>>>>> and > >>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>> get > >>>>>>>>>>>>>>>>>>>>>> the buggy behavior back :-). Actually, this would > >>>>>>>>> potentially be > >>>>>>>>>>>>>>> even > >>>>>>>>>>>>>>>>>>>>>> worse than the original buggy behavior because the > >>>> bug > >>>>>> was > >>>>>>>>> that > >>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>> ignored > >>>>>>>>>>>>>>>>>>>>>> errors that happened in the "send()" method itself, > >>>> not > >>>>>>>>>>>>>>> necessarily the > >>>>>>>>>>>>>>>>>>>>>> ones that we got from the broker. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Actually, looking at > >>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/11508/files, > >>>>>>>>>>>>>>>>>>>>>> wouldn't a better solution be to just throw the > >>> error > >>>>>> from > >>>>>>>>> the > >>>>>>>>>>>>>>> "send" > >>>>>>>>>>>>>>>>>>>>>> method itself, rather than trying to set it to be > >>>>> thrown > >>>>>>>>> during > >>>>>>>>>>>>>>> commit? > >>>>>>>>>>>>>>>>>>>>>> This way the example in > >>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 > >>>>>>>>>>>>>>>>>>>>>> would be fixed, and at the same time it would give > >>> an > >>>>>>>>> opportunity > >>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>> KS > >>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>> catch the error and ignore it if needed. Not sure > >>> if > >>>>> we > >>>>>>>>> need a > >>>>>>>>>>>>>>> KIP for > >>>>>>>>>>>>>>>>>>>>>> that, just do a better fix of the old bug. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> -Artem > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 4:58 PM Justine Olshan > >>>>>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid> > >>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I'm a bit late to the party, but the discussion > >>> here > >>>>>> looks > >>>>>>>>>>>>>>> reasonable. > >>>>>>>>>>>>>>>>>>>>>>> Moving the logic to a transactional method makes > >>>> sense > >>>>>> to > >>>>>>>>> me and > >>>>>>>>>>>>>>> makes > >>>>>>>>>>>>>>>>>>>>> me > >>>>>>>>>>>>>>>>>>>>>>> feel a bit better about keeping the complexity in > >>>> the > >>>>>>>>> methods > >>>>>>>>>>>>>>> relevant > >>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>> the issue. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> One minor concern is that if we set "ignore send > >>>>>>>>>>>>>>>>>>>>>>> errors" (or whatever we decide to name it) option > >>>>>> without > >>>>>>>>>>>>> explicit > >>>>>>>>>>>>>>>>>>>>> flush, > >>>>>>>>>>>>>>>>>>>>>>> it'll actually lead to broken behavior as the > >>>>>> application > >>>>>>>>> won't > >>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>> able > >>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>> stop a commit from proceeding even on fatal > >>> errors. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Is this with respect to the case a request is > >>> still > >>>>>>> inflight > >>>>>>>>>>>>> when > >>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>> call > >>>>>>>>>>>>>>>>>>>>>>> commitTransaction? I thought we still wait for > >>>>> requests > >>>>>>> (and > >>>>>>>>>>>>> their > >>>>>>>>>>>>>>>>>>>>> errors) > >>>>>>>>>>>>>>>>>>>>>>> to come in and could handle fatal errors > >>>>> appropriately. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Justine > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 4:32 PM Artem Livshits > >>>>>>>>>>>>>>>>>>>>>>> <alivsh...@confluent.io.invalid> wrote: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Hi Matthias (and other folks who suggested > >>> ideas), > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> maybe `commitTransaction(CommitOptions)` or > >>>> similar > >>>>>>> could > >>>>>>>>> be a > >>>>>>>>>>>>>>> good > >>>>>>>>>>>>>>>>>>>>>>> way > >>>>>>>>>>>>>>>>>>>>>>>> forward? > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> I like this approach. One minor concern is that > >>> if > >>>>> we > >>>>>>> set > >>>>>>>>>>>>>>> "ignore > >>>>>>>>>>>>>>>>>>> send > >>>>>>>>>>>>>>>>>>>>>>>> errors" (or whatever we decide to name it) option > >>>>>> without > >>>>>>>>>>>>>>> explicit > >>>>>>>>>>>>>>>>>>>>> flush, > >>>>>>>>>>>>>>>>>>>>>>>> it'll actually lead to broken behavior as the > >>>>>> application > >>>>>>>>> won't > >>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>> able > >>>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> stop a commit from proceeding even on fatal > >>> errors. > >>>>>> But > >>>>>>> I > >>>>>>>>>>>>> guess > >>>>>>>>>>>>>>>>>>> we'll > >>>>>>>>>>>>>>>>>>>>>>> just > >>>>>>>>>>>>>>>>>>>>>>>> have to clearly document it. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> In some way we are basically adding a flag to > >>>>>> optionally > >>>>>>>>>>>>> restore > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 > >>>>> bug, > >>>>>>>>> which is > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> motivation for all these changes, anyway :-). > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> -Artem > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 2:18 PM Matthias J. Sax < > >>>>>>>>>>>>>>> mj...@apache.org> > >>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Seems the option to use a config does not get a > >>>> lot > >>>>> of > >>>>>>>>>>>>> support. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> So we need to go with some form or "overload / > >>> new > >>>>>>>>> method". I > >>>>>>>>>>>>>>> think > >>>>>>>>>>>>>>>>>>>>>>>>> Chris' point about not coupling it to `flush()` > >>>> but > >>>>>>> rather > >>>>>>>>>>>>>>>>>>>>>>>>> `commitTransaction()` is actually a very good > >>> one; > >>>>> for > >>>>>>>>> non-tx > >>>>>>>>>>>>>>> case, > >>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>> different flush variants would not make sense. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> I also like Lianet's idea to pass in some > >>>> "options" > >>>>>>>>> object, so > >>>>>>>>>>>>>>> maybe > >>>>>>>>>>>>>>>>>>>>>>>>> `commitTransaction(CommitOptions)` or similar > >>>> could > >>>>>> be a > >>>>>>>>> good > >>>>>>>>>>>>>>> way > >>>>>>>>>>>>>>>>>>>>>>>>> forward? It's much better than a `boolean` > >>>>> parameter, > >>>>>>>>>>>>>>> aesthetically, > >>>>>>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>>>>> we as extendable in the future if necessary. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Given that we would pass in an optional > >>> parameter, > >>>>> we > >>>>>>>>> might > >>>>>>>>>>>>> not > >>>>>>>>>>>>>>> even > >>>>>>>>>>>>>>>>>>>>>>>>> need to deprecate the existing > >>>> `commitTransaction()` > >>>>>>>>> method? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> On 6/20/24 9:12 AM, Andrew Schofield wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Alieh, > >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> I *really* don’t like adding a config which > >>>> changes > >>>>>> the > >>>>>>>>>>>>>>> behaviour > >>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>> flush() method. We already have too many > >>> configs. > >>>>>> But I > >>>>>>>>>>>>> totally > >>>>>>>>>>>>>>>>>>>>>>>>> understand > >>>>>>>>>>>>>>>>>>>>>>>>>> the problem that you’re trying to solve and > >>> some > >>>> of > >>>>>> the > >>>>>>>>> other > >>>>>>>>>>>>>>>>>>>>>>>> suggestions > >>>>>>>>>>>>>>>>>>>>>>>>>> in this thread seem neater. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Personally, I would add another method to > >>>>>>> KafkaProducer. > >>>>>>>>> Not > >>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>>>>>>>>> overload > >>>>>>>>>>>>>>>>>>>>>>>>>> on flush() because this is not flush() at all. > >>>>> Using > >>>>>>>>>>>>> Matthias’s > >>>>>>>>>>>>>>>>>>>>>>>> options, > >>>>>>>>>>>>>>>>>>>>>>>>>> I prefer (3). > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>>>>>>> Andrew > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> On 20 Jun 2024, at 15:08, Lianet M. < > >>>>>>> liane...@gmail.com > >>>>>>>>>> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all, thanks for the KIP Alieh! > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> LM1. Totally agree with Artem's point about > >>> the > >>>>>> config > >>>>>>>>> not > >>>>>>>>>>>>>>> being > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>> most > >>>>>>>>>>>>>>>>>>>>>>>>>>> explicit/flexible way to express this > >>>> capability. > >>>>>>>>> Getting > >>>>>>>>>>>>>>> then to > >>>>>>>>>>>>>>>>>>>>>>>>> Matthias > >>>>>>>>>>>>>>>>>>>>>>>>>>> 4 options, what I don't like about 3 and 4 is > >>>> that > >>>>>> it > >>>>>>>>> seems > >>>>>>>>>>>>>>> they > >>>>>>>>>>>>>>>>>>>>>>> might > >>>>>>>>>>>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>>>>>>>> age very well? Aren't we going to be wanting > >>>> some > >>>>>>> other > >>>>>>>>>>>>> twist > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>> flush > >>>>>>>>>>>>>>>>>>>>>>>>>>> semantics that will have us adding yet another > >>>>> param > >>>>>>> to > >>>>>>>>> it, > >>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>>>>>>>> another > >>>>>>>>>>>>>>>>>>>>>>>>>>> overloaded method? I truly don't have the > >>>> context > >>>>> to > >>>>>>>>> answer > >>>>>>>>>>>>>>> that, > >>>>>>>>>>>>>>>>>>>>>>> but > >>>>>>>>>>>>>>>>>>>>>>>>> if it > >>>>>>>>>>>>>>>>>>>>>>>>>>> feels like a realistic future maybe adding > >>> some > >>>>> kind > >>>>>>>>>>>>>>> FlushOptions > >>>>>>>>>>>>>>>>>>>>>>>>> params to > >>>>>>>>>>>>>>>>>>>>>>>>>>> the flush would be better from an > >>> extensibility > >>>>>> point > >>>>>>> of > >>>>>>>>>>>>>>> view. It > >>>>>>>>>>>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>>>>>>>>>>> only have the clearErrors option available for > >>>> now > >>>>>> but > >>>>>>>>> could > >>>>>>>>>>>>>>>>>>> accept > >>>>>>>>>>>>>>>>>>>>>>>> any > >>>>>>>>>>>>>>>>>>>>>>>>>>> other we may need. I find that this would > >>> remove > >>>>> the > >>>>>>>>>>>>>>> "ugliness" > >>>>>>>>>>>>>>>>>>>>>>>> Matthias > >>>>>>>>>>>>>>>>>>>>>>>>>>> pointed out for 3. and 4. > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> LM2. No matter how we end up expressing the > >>>>>> different > >>>>>>>>>>>>>>> semantics > >>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>>> flush, > >>>>>>>>>>>>>>>>>>>>>>>>>>> let's make sure we update the KIP on the flush > >>>> and > >>>>>>>>>>>>>>>>>>> commitTransaction > >>>>>>>>>>>>>>>>>>>>>>>>> java > >>>>>>>>>>>>>>>>>>>>>>>>>>> docs. It currently states that flush "clears > >>>> the > >>>>>> last > >>>>>>>>>>>>>>> exception" > >>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>>> commitTransaction "will NOT throw" if called > >>>> after > >>>>>>>>> flush, > >>>>>>>>>>>>> but > >>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>>> really > >>>>>>>>>>>>>>>>>>>>>>>>> all > >>>>>>>>>>>>>>>>>>>>>>>>>>> depends on the config/options/method used. > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> LM3. I find it would be helpful to include an > >>>>>> example > >>>>>>> to > >>>>>>>>>>>>> show > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> new > >>>>>>>>>>>>>>>>>>>>>>>>> flow > >>>>>>>>>>>>>>>>>>>>>>>>>>> that we're unblocking (I see this as the great > >>>>> gain > >>>>>>>>> here): > >>>>>>>>>>>>>>> flush > >>>>>>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>>>> clear > >>>>>>>>>>>>>>>>>>>>>>>>>>> error option enabled -> catch and do whatever > >>>>> error > >>>>>>>>> handling > >>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>> want > >>>>>>>>>>>>>>>>>>>>>>>> -> > >>>>>>>>>>>>>>>>>>>>>>>>>>> commitTransaction successfully > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks! > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Lianet > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 11:26 PM Chris > >>> Egerton < > >>>>>>>>>>>>>>>>>>>>>>>> fearthecel...@gmail.com > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Matthias, > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> I like the alternatives you've listed. One > >>> more > >>>>>> that > >>>>>>>>> might > >>>>>>>>>>>>>>> help > >>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>> if, > >>>>>>>>>>>>>>>>>>>>>>>>>>>> instead of overloading flush(), we overloaded > >>>>>>>>>>>>>>> commitTransaction() > >>>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>>>>> something like commitTransaction(boolean > >>>>>>>>>>>>>>> tolerateRecordErrors). > >>>>>>>>>>>>>>>>>>>>>>> This > >>>>>>>>>>>>>>>>>>>>>>>>> seems > >>>>>>>>>>>>>>>>>>>>>>>>>>>> slightly cleaner in that it takes the > >>>> behavioral > >>>>>>>>> change we > >>>>>>>>>>>>>>> want, > >>>>>>>>>>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>>>>>>> only > >>>>>>>>>>>>>>>>>>>>>>>>>>>> applies to transactional producers, to an API > >>>>>> method > >>>>>>>>> that > >>>>>>>>>>>>> is > >>>>>>>>>>>>>>> only > >>>>>>>>>>>>>>>>>>>>>>>> used > >>>>>>>>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>>>>>> transactional producers. It would also avoid > >>>> the > >>>>>>> issue > >>>>>>>>> of > >>>>>>>>>>>>>>> whether > >>>>>>>>>>>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>>>>>>>>> flush() (or a new variant of it with altered > >>>>>>> semantics) > >>>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>>>>>>>> throw > >>>>>>>>>>>>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>>>>>>>>>>>>> not. Thoughts? > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Alieh, > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP, I like this direction a > >>> lot > >>>>>> more > >>>>>>>>> than > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> pluggable > >>>>>>>>>>>>>>>>>>>>>>>>>>>> handler! > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> I share Artem's concerns that enabling this > >>>>>> behavior > >>>>>>>>> via > >>>>>>>>>>>>>>>>>>>>>>>> configuration > >>>>>>>>>>>>>>>>>>>>>>>>>>>> doesn't seem like a great fit. It's likely > >>> that > >>>>>>>>> application > >>>>>>>>>>>>>>> code > >>>>>>>>>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>>>>>> written in a style that only works with one > >>>> type > >>>>> of > >>>>>>>>>>>>> behavior > >>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>>>>>>>> transactional producers, so requiring that > >>>>>>> application > >>>>>>>>> code > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> declare > >>>>>>>>>>>>>>>>>>>>>>>>> its > >>>>>>>>>>>>>>>>>>>>>>>>>>>> expectations for the behavior of its producer > >>>>> seems > >>>>>>>>> more > >>>>>>>>>>>>>>>>>>>>>>> appropriate > >>>>>>>>>>>>>>>>>>>>>>>>> than, > >>>>>>>>>>>>>>>>>>>>>>>>>>>> e.g., allowing users deploying that > >>> application > >>>>> to > >>>>>>>>> tweak a > >>>>>>>>>>>>>>>>>>>>>>>>> configuration > >>>>>>>>>>>>>>>>>>>>>>>>>>>> file that gets fed to producers spun up > >>> inside > >>>>> it. > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Chris > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 10:32 PM Matthias J. > >>>> Sax > >>>>> < > >>>>>>>>>>>>>>>>>>> mj...@apache.org > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP Alieh. I actually like > >>> the > >>>>> KIP > >>>>>>>>> as-is, > >>>>>>>>>>>>> but > >>>>>>>>>>>>>>>>>>> think > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Arthem raises very good points... > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Seems we have four options on how to move > >>>>> forward? > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. add config to allow "silent error > >>>>> clearance" > >>>>>> as > >>>>>>>>> the > >>>>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>>>>>>>>>>> proposes > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. change flush() to clear error and let > >>> it > >>>>>> throw > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. add new flushAndThrow()` (or better > >>> name) > >>>>>> which > >>>>>>>>>>>>> clears > >>>>>>>>>>>>>>>>>>> error > >>>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> throws > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 4. add `flush(boolean clearAndThrow)` and > >>>> let > >>>>>> user > >>>>>>>>> pick > >>>>>>>>>>>>>>> (and > >>>>>>>>>>>>>>>>>>>>>>>>> deprecate > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> existing `flush()`) > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> For (2), given that it would be a behavior > >>>>> change, > >>>>>>> we > >>>>>>>>>>>>> might > >>>>>>>>>>>>>>> also > >>>>>>>>>>>>>>>>>>>>>>>> need > >>>>>>>>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> public "feature flag" config. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> It seems, both (1) and (2) have the issue > >>>> Artem > >>>>>>>>> mentioned. > >>>>>>>>>>>>>>> (3) > >>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>> (4) > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> would be safer to this end, however, for > >>> both > >>>> we > >>>>>>>>> kinda get > >>>>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>>>> ugly > >>>>>>>>>>>>>>>>>>>>>>>>> API? > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Not sure right now if I have any preference. > >>>>> Seems > >>>>>>> we > >>>>>>>>> need > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> pick > >>>>>>>>>>>>>>>>>>>>>>>>> some > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> evil and that there is no clear best > >>> solution? > >>>>>> Would > >>>>>>>>> be > >>>>>>>>>>>>>>> good to > >>>>>>>>>>>>>>>>>>>>>>> her > >>>>>>>>>>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> others what they think > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 6/18/24 8:39 PM, Artem Livshits wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Alieh, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thank you for the KIP. I have a couple of > >>>>>>>>> suggestions: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> AL1. We should throw an error from flush > >>>> after > >>>>>> we > >>>>>>>>> clear > >>>>>>>>>>>>>>> it. > >>>>>>>>>>>>>>>>>>>>>>> This > >>>>>>>>>>>>>>>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> make it so that both "send + commit" and > >>>> "send > >>>>> + > >>>>>>>>> flush + > >>>>>>>>>>>>>>>>>>> commit" > >>>>>>>>>>>>>>>>>>>>>>>> (the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> latter looks like just a more verbose way > >>> to > >>>>>>> express > >>>>>>>>> the > >>>>>>>>>>>>>>>>>>> former, > >>>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would be intuitive if it behaves the same) > >>>>> would > >>>>>>>>> throw if > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>> transaction > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> has an error (so if the code is written > >>>> either > >>>>>> way > >>>>>>>>> it's > >>>>>>>>>>>>>>> going > >>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> correct). > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> At the same time, the latter could be > >>>> extended > >>>>> by > >>>>>>> the > >>>>>>>>>>>>>>> caller to > >>>>>>>>>>>>>>>>>>>>>>>>>>>> intercept > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> exceptions from flush, ignore as needed, > >>> and > >>>>>> commit > >>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> transaction. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> This > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> solution would keep basic things simple (if > >>>>>> someone > >>>>>>>>> has > >>>>>>>>>>>>>>> code > >>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>>>>>> doesn't > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> require advanced error handling, then basic > >>>>>> "send + > >>>>>>>>>>>>> flush + > >>>>>>>>>>>>>>>>>>>>>>> commit" > >>>>>>>>>>>>>>>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> do the right thing) and advanced things > >>>>> possible, > >>>>>>> an > >>>>>>>>>>>>>>>>>>> application > >>>>>>>>>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>>>>>>>>> add > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> try + catch around flush and ignore some > >>>>> errors. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> AL2. I'm not sure if config is the best > >>> way > >>>> to > >>>>>>>>> express > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>> modification > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the "flush" semantics -- the application > >>>> logic > >>>>>> that > >>>>>>>>> calls > >>>>>>>>>>>>>>>>>>> "flush" > >>>>>>>>>>>>>>>>>>>>>>>>> needs > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> match the "flush" semantics and configuring > >>>>>>>>> semantics in > >>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>>> detached > >>>>>>>>>>>>>>>>>>>>>>>>>>>> place > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> creates a room for bugs due to > >>> discrepancies. > >>>>>> This > >>>>>>>>> can > >>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>> especially > >>>>>>>>>>>>>>>>>>>>>>>>>>>> bad > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> if the producer loads configuration from a > >>>> file > >>>>>> at > >>>>>>>>> run > >>>>>>>>>>>>>>> time, in > >>>>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> case a > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mistake in configuration could break the > >>>>>>> application > >>>>>>>>>>>>>>> because it > >>>>>>>>>>>>>>>>>>>>>>> was > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> written > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to expect one "flush" semantics but the > >>>>> semantics > >>>>>>> is > >>>>>>>>>>>>>>> switched. > >>>>>>>>>>>>>>>>>>>>>>>> Given > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the "flush" semantics needs to match the > >>>>> caller's > >>>>>>>>>>>>>>> expectation, > >>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>>>> way > >>>>>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> accomplish that would be to pass the > >>> caller's > >>>>>>>>> expectation > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>> "flush" > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> call by either have a method with a > >>> different > >>>>>> name > >>>>>>> or > >>>>>>>>>>>>> have > >>>>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>>>>>>>>>> overload > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a Boolen flag that would configure the > >>>>> semantics > >>>>>>> (the > >>>>>>>>>>>>>>> current > >>>>>>>>>>>>>>>>>>>>>>>> method > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> just redirect to the new one). > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Artem > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Jun 17, 2024 at 9:09 AM Alieh > >>> Saeedi > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'd like to kick off a discussion for > >>>> KIP-1059 > >>>>>>> that > >>>>>>>>>>>>>>> suggests > >>>>>>>>>>>>>>>>>>>>>>>> adding > >>>>>>>>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> new > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> feature to the Producer flush() method. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>> < > >>>> > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1059%3A+Enable+the+Producer+flush%28%29+method+to+clear+the+latest+send%28%29+error > >>>>> > >>>> > >>> > >