Hi Andrew, I like a lot of what you said, but I still believe it's better to override commitTransaction than flush. Users will already have to manually opt in to ignoring errors encountered during transactions, and we can document recommended usage (i.e., explicitly invoking flush() before invoking commitTransaction(ignoreRecordErrors)) in the newly-introduced method. I don't believe it's worth the increased cognitive load on users with non-transactional producers to introduce an overloaded flush() variant.
Cheers, Chris On Mon, Jun 24, 2024 at 9:39 AM Andrew Schofield <andrew_schofi...@live.com> wrote: > Hi Alieh, > Thanks for driving this. Unfortunately, there are many parts of the API > which > are a bit unfortunate and it’s tricky to make small improvements that > don’t have > downsides. > > I don’t like the idea of using a configuration because configuration is > often > outside the application and changing the behaviour of someone else’s > application > without understanding it is risky. Anything which embeds a transactional > producer > could have its behaviour changed unexpectedly. > > It would be been much nicer if send() didn’t fail silently and change the > transaction > state. But, because it’s an asynchronous operation, I don’t really think > we can > just make it throw all exceptions, even though I really think that > `send()` is the > method with the problem here. > > The contract of `flush()` is that it makes sure that all preceding sends > will have > completed, so it should be true that a well written application would be > able to > know which records were OK because of the Future<RecordMetadata> returned > by the `send()` method. It should be able to determine whether it wants to > commit > the transaction even if some of the intended operations didn’t succeed. > > What we don’t currently have is a way for the application to say to the > KafkaProducer > that it knows the outcome of sending the records and to confirm that it > wants to proceed. > Then it would not be necessary for `commitTransaction()` to throw an > exception to > report a historical error which the application might choose to ignore. > > Having read the comments, I think the KIP is on the right lines focusing > on the `flush()` > method. My suggestion is that we introduce an option on `flush()` to be > used before > `commitTransaction()` for applications that want to be able to commit > transactions which > had known failed operations. > > The code would be: > > producer.beginTransaction(); > > future1 = producer.send(goodRecord1); > future2 = producer.send(badRecord); // The future from this call will > complete exceptionally > future3 = producer.send(goodRecord2); > > producer.flush(FlushOption.TRANSACTION_READY); > > // At this point, we know that all 3 futures are complete and the > transaction contains 2 records > producer.commitTransaction(); > > I wouldn’t deprecate `flush()` with no option. It just uses the default > option which behaves > like today. > > Why did I suggest an option on `flush()` rather than > `commitTransaction()`? Because with > `flush()`, it’s clear when the application is stating that it’s seen all > of the results from its > `send()` calls and it’s ready to proceed. If it has to rely on flushing > that occurs inside > `commitTransaction()`, I don’t see it’s as clear-cut. > > Thanks, > Andrew > > > > > On 24 Jun 2024, at 13:44, Alieh Saeedi <asae...@confluent.io.INVALID> > wrote: > > > > Hi all, > > Thanks for the interesting discussion. > > > > I assume that now the main questions are as follows: > > > > 1. Do we need to transit the transcation to the error state for API > > exceptions? > > 2. Should we throw the API exception in `send()` instead of returning a > > future error? > > 3. If the answer to question (1) is NO and to question (2) is YES, do we > > need to change the current `flush` or `commitTnx` at all? > > > > Cheers, > > Alieh > > > > On Sat, Jun 22, 2024 at 3:21 AM Matthias J. Sax <mj...@apache.org> > wrote: > > > >> Hey Kirk, > >> > >> can you elaborate on a few points? > >> > >>> Otherwise users would have to know to explicitly change their code to > >> invoke flush(). > >> > >> Why? If we would add an option to `flush(FlushOption)`, the existing > >> `flush()` w/o any option will still be there, right? If we would really > >> deprecate existing `flush()`, it would just mean that we would pass > >> "default FlushOption" into an implicit flush (and yes, we would need to > >> define what this would be). > >> > >> I think there is no clear winner (as pointed out in my last reply), and > >> both `flush(FlushOption)` and `commitTx(CommitOption)` has advantages > >> and drawbacks. Guess we need to just agree on which tradeoff we want to > >> move forward with? > >> > >> > >> Not sure if your database example is a 1:1 fit? I think, the better > >> comparison would be: > >> > >> BEGIN TX; > >> INSERT INTO foo VALUES (’a’); > >> INSERT INTO foo VALUES (’b’); > >> INSERT INTO foo VALUES (’c’); > >> INSERT INTO foo VALUES (’not sure’); > >> > >> For this case, the full TX would roll back, right? I still think that > >> allowing users to just skip over the last error, and continue the TX > >> would be ok. In the end, we provide a programmatic API, and not a > >> declarative one as SQL. Of course, default behavior would still be to > >> put the producer into error state, and the user would need to call > >> `abortTransaction()` to move forward. > >> > >> > >> -Matthias > >> > >> On 6/21/24 5:26 PM, Kirk True wrote: > >>> Hi Matthias, > >>> > >>>> On Jun 21, 2024, at 12:28 PM, Matthias J. Sax <mj...@apache.org> > wrote: > >>>> > >>>> If we want to limit it to `RecordTooLargeException` throwing from > >> `send()` directly make sense. Thanks for calling it out. > >>>> > >>>> It's still a question of backward compatibility? `send()` does throw > >> exceptions already, including generic `KafkaException`. Not sure if this > >> helps with backward compatibility? Could we just add a new exception > type > >> (which is a child of `KafkaException`)? > >>>> > >>>> The Producer JavaDocs are not totally explicit about it IMHO. > >>>> > >>>> I think we could expect that some generic error handling path gets > >> executed. For the TX-case, I would assume that a TX would be aborted if > >> `send()` throws or that the producer would be `closed()`. Overall this > >> might be safe? > >>>> > >>>>> It would be a little less flexible > >>>>>> though, since (as you note) it would still be impossible to commit > >>>>>> transactions after errors have been reported from brokers. > >>>> > >>>> KS would still need a way to clear the error state of the producer. We > >> could catch a `RecordTooLargeException` from `send()`, call the handler > and > >> let it decide what to do next. But if it does return `CONTINUE` to > swallow > >> the error and drop the poison pill record on the floor, we would want to > >> move forward and commit the transaction. > >>>> > >>>> But the question is: if we cannot add a record to the tx, does the > >> producer need to go into error state? In the end, we did throw and > inform > >> the app that the record was _not_ added, and it's up to the app to > decide > >> what to do next? > >>> > >>> That’s an excellent question… > >>> > >>> Imagine the user’s application is writing information to a database > >> instead of Kafka. If there’s a table with a CHAR(1) column and this SQL > >> statement was attempted, what should happen? > >>> > >>> INSERT INTO foo VALUES (’not sure’); > >>> > >>> Yes, that DML would fail, sure, but would the user expect that the > >> connection used by database library would get stuck in some kind of > error > >> state? A user would be able catch the error and either continue or > abort, > >> based on their business rules. > >>> > >>> So I agree with what I believe you’re implying: we shouldn’t poison the > >> Producer/TransactionManager on certain types of application-level > errors in > >> send(). > >>> > >>> Kirk > >>> > >>>> If we report the error only via the `Callback` it's a different story, > >> because the contract for this case is clearly specified on the JavaDocs: > >>>> > >>>>> When used as part of a transaction, it is not necessary to define a > >> callback or check the result of the future > >>>>> in order to detect errors from <code>send</code>. If any of the send > >> calls failed with an irrecoverable error, > >>>>> the final {@link #commitTransaction()} call will fail and throw the > >> exception from the last failed send. When > >>>>> this happens, your application should call {@link > #abortTransaction()} > >> to reset the state and continue to send > >>>>> data. > >>>> > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> > >>>> On 6/21/24 11:42 AM, Chris Egerton wrote: > >>>>> Hi Artem, > >>>>> I think it'd make sense to throw directly from send whenever > possible, > >>>>> instead of returning an already-completed future. I didn't do that in > >> my > >>>>> bug fix to try to be conservative about breaking changes but this > >> seems to > >>>>> have caused its own set of headaches. It would be a little less > >> flexible > >>>>> though, since (as you note) it would still be impossible to commit > >>>>> transactions after errors have been reported from brokers. > >>>>> I'll leave it up to the Kafka Streams folks to decide if that > >> flexibility > >>>>> is required. If it is, then users could explicitly call flush() > before > >>>>> committing (and ignoring errors for) or aborting a transaction, if > they > >>>>> want to implement fine-grained error handling logic such as allowing > >> errors > >>>>> for a subset of topics to be ignored. > >>>>> Hi Matthias, > >>>>> Most of the time you're right and we can't throw from send(); > however, > >> in > >>>>> this case (client-side record-too-large exception), the error is > >> actually > >>>>> noticed by the producer before send() returns, so it should be > >> possible to > >>>>> throw directly. > >>>>> Cheers, > >>>>> Chris > >>>>> On Fri, Jun 21, 2024, 14:25 Matthias J. Sax <mj...@apache.org> > wrote: > >>>>>> Not sure if we can change send and make it throw, given that send() > is > >>>>>> async? That is why users can register a `Callback` to begin with, > >> right? > >>>>>> > >>>>>> And Alieh's point about backward compatibility is also a fair > concern. > >>>>>> > >>>>>> > >>>>>>> Actually, this would potentially be even > >>>>>>> worse than the original buggy behavior because the bug was that we > >>>>>> ignored > >>>>>>> errors that happened in the "send()" method itself, not necessarily > >> the > >>>>>>> ones that we got from the broker. > >>>>>> > >>>>>> My understanding was that `commitTx(swallowError)` would only > swallow > >>>>>> `send()` errors, not errors about the actually commit. I agree that > it > >>>>>> would be very bad to swallow errors about the actual tx commit... > >>>>>> > >>>>>> It's a fair question if this might be too subtle; to make it > explicit, > >>>>>> we could use `CommitOpions#ignorePendingSendErors()` [working name] > to > >>>>>> make it clear. > >>>>>> > >>>>>> > >>>>>> If we think it's too subtle to change commit to swallow send() > errors, > >>>>>> maybe going with `flush(FlushOptions)` would be clearer (and we can > >> use > >>>>>> `FlushOption#swallowSendErrorsForTransactions()` [working name] to > be > >>>>>> explicitly that the `FlushOption` for now has only an effect for > TX). > >>>>>> > >>>>>> > >>>>>> Thoughts? > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> > >>>>>> > >>>>>> On 6/21/24 4:10 AM, Alieh Saeedi wrote: > >>>>>>> Hi all, > >>>>>>> > >>>>>>> > >>>>>>> It is very exciting to see all the experts here raising very good > >> points. > >>>>>>> > >>>>>>> As we go further, we see more and more options to improve our > >> solution, > >>>>>>> which makes concluding and updating the KIP impossible. > >>>>>>> > >>>>>>> > >>>>>>> The main suggestions so far are: > >>>>>>> > >>>>>>> 1. `flush` with `flushOptions` as input parameter > >>>>>>> > >>>>>>> 2. `commitTx` with `commitOptions` as input parameter > >>>>>>> > >>>>>>> 3. `send` must throw the exception > >>>>>>> > >>>>>>> > >>>>>>> My concern about the 3rd suggestion: > >>>>>>> > >>>>>>> 1. Does the change cause any issue with backward compatibility? > >>>>>>> > >>>>>>> 2. The `send (bad record)` already transits the transaction to the > >> error > >>>>>>> state. No user, including Streams is able to transit the > transaction > >> back > >>>>>>> from the error state. Do you mean we remove the > >>>>>>> `maybeTransitionToErrorState(e)` from here > >>>>>>> < > >>>>>> > >> > https://github.com/apache/kafka/blob/9b5b434e2a6b2d5290ea403fc02859b1c523d8aa/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1112 > >>>>>>> > >>>>>>> as well? > >>>>>>> > >>>>>>> Cheers, > >>>>>>> Alieh > >>>>>>> > >>>>>>> > >>>>>>> On Fri, Jun 21, 2024 at 8:45 AM Andrew Schofield < > >>>>>> andrew_schofi...@live.com> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi Artem, > >>>>>>>> I think you make a good point which is worth further > consideration. > >> If > >>>>>>>> any of the existing methods is really ripe for a change here, it’s > >> the > >>>>>>>> send() that actually caused the problem. If that can be fixed so > >> there > >>>>>> are > >>>>>>>> no situations in which a lurking error breaks a transaction, that > >> might > >>>>>> be > >>>>>>>> the best. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Andrew > >>>>>>>> > >>>>>>>>> On 21 Jun 2024, at 01:51, Artem Livshits <alivsh...@confluent.io > >>>>>> .INVALID> > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> I thought we still wait for requests (and their errors) to come > >> in and > >>>>>>>>> could handle fatal errors appropriately. > >>>>>>>>> > >>>>>>>>> We do wait for requests, but my understanding is that when > >>>>>>>>> commitTransaction("ignore send errors") we want to ignore errors. > >> So > >>>>>> if > >>>>>>>> we > >>>>>>>>> do > >>>>>>>>> > >>>>>>>>> 1. send > >>>>>>>>> 2. commitTransaction("ignore send errors") > >>>>>>>>> > >>>>>>>>> the commit will succeed. You can look at the example in > >>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 and just > >> substitute > >>>>>>>>> commitTransaction with commitTransaction("ignore send errors") > and > >> we > >>>>>> get > >>>>>>>>> the buggy behavior back :-). Actually, this would potentially be > >> even > >>>>>>>>> worse than the original buggy behavior because the bug was that > we > >>>>>>>> ignored > >>>>>>>>> errors that happened in the "send()" method itself, not > >> necessarily the > >>>>>>>>> ones that we got from the broker. > >>>>>>>>> > >>>>>>>>> Actually, looking at > >> https://github.com/apache/kafka/pull/11508/files, > >>>>>>>>> wouldn't a better solution be to just throw the error from the > >> "send" > >>>>>>>>> method itself, rather than trying to set it to be thrown during > >> commit? > >>>>>>>>> This way the example in > >>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 > >>>>>>>>> would be fixed, and at the same time it would give an opportunity > >> for > >>>>>> KS > >>>>>>>> to > >>>>>>>>> catch the error and ignore it if needed. Not sure if we need a > >> KIP for > >>>>>>>>> that, just do a better fix of the old bug. > >>>>>>>>> > >>>>>>>>> -Artem > >>>>>>>>> > >>>>>>>>> On Thu, Jun 20, 2024 at 4:58 PM Justine Olshan > >>>>>>>> <jols...@confluent.io.invalid> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> I'm a bit late to the party, but the discussion here looks > >> reasonable. > >>>>>>>>>> Moving the logic to a transactional method makes sense to me and > >> makes > >>>>>>>> me > >>>>>>>>>> feel a bit better about keeping the complexity in the methods > >> relevant > >>>>>>>> to > >>>>>>>>>> the issue. > >>>>>>>>>> > >>>>>>>>>>> One minor concern is that if we set "ignore send > >>>>>>>>>> errors" (or whatever we decide to name it) option without > explicit > >>>>>>>> flush, > >>>>>>>>>> it'll actually lead to broken behavior as the application won't > be > >>>>>> able > >>>>>>>> to > >>>>>>>>>> stop a commit from proceeding even on fatal errors. > >>>>>>>>>> > >>>>>>>>>> Is this with respect to the case a request is still inflight > when > >> we > >>>>>>>> call > >>>>>>>>>> commitTransaction? I thought we still wait for requests (and > their > >>>>>>>> errors) > >>>>>>>>>> to come in and could handle fatal errors appropriately. > >>>>>>>>>> > >>>>>>>>>> Justine > >>>>>>>>>> > >>>>>>>>>> On Thu, Jun 20, 2024 at 4:32 PM Artem Livshits > >>>>>>>>>> <alivsh...@confluent.io.invalid> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi Matthias (and other folks who suggested ideas), > >>>>>>>>>>> > >>>>>>>>>>>> maybe `commitTransaction(CommitOptions)` or similar could be a > >> good > >>>>>>>>>> way > >>>>>>>>>>> forward? > >>>>>>>>>>> > >>>>>>>>>>> I like this approach. One minor concern is that if we set > >> "ignore > >>>>>> send > >>>>>>>>>>> errors" (or whatever we decide to name it) option without > >> explicit > >>>>>>>> flush, > >>>>>>>>>>> it'll actually lead to broken behavior as the application won't > >> be > >>>>>> able > >>>>>>>>>> to > >>>>>>>>>>> stop a commit from proceeding even on fatal errors. But I > guess > >>>>>> we'll > >>>>>>>>>> just > >>>>>>>>>>> have to clearly document it. > >>>>>>>>>>> > >>>>>>>>>>> In some way we are basically adding a flag to optionally > restore > >> the > >>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 bug, which is > >> the > >>>>>>>>>>> motivation for all these changes, anyway :-). > >>>>>>>>>>> > >>>>>>>>>>> -Artem > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Thu, Jun 20, 2024 at 2:18 PM Matthias J. Sax < > >> mj...@apache.org> > >>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Seems the option to use a config does not get a lot of > support. > >>>>>>>>>>>> > >>>>>>>>>>>> So we need to go with some form or "overload / new method". I > >> think > >>>>>>>>>>>> Chris' point about not coupling it to `flush()` but rather > >>>>>>>>>>>> `commitTransaction()` is actually a very good one; for non-tx > >> case, > >>>>>>>> the > >>>>>>>>>>>> different flush variants would not make sense. > >>>>>>>>>>>> > >>>>>>>>>>>> I also like Lianet's idea to pass in some "options" object, so > >> maybe > >>>>>>>>>>>> `commitTransaction(CommitOptions)` or similar could be a good > >> way > >>>>>>>>>>>> forward? It's much better than a `boolean` parameter, > >> aesthetically, > >>>>>>>> as > >>>>>>>>>>>> we as extendable in the future if necessary. > >>>>>>>>>>>> > >>>>>>>>>>>> Given that we would pass in an optional parameter, we might > not > >> even > >>>>>>>>>>>> need to deprecate the existing `commitTransaction()` method? > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> -Matthias > >>>>>>>>>>>> > >>>>>>>>>>>> On 6/20/24 9:12 AM, Andrew Schofield wrote: > >>>>>>>>>>>>> Hi Alieh, > >>>>>>>>>>>>> Thanks for the KIP. > >>>>>>>>>>>>> > >>>>>>>>>>>>> I *really* don’t like adding a config which changes the > >> behaviour > >>>>>> of > >>>>>>>>>>> the > >>>>>>>>>>>>> flush() method. We already have too many configs. But I > totally > >>>>>>>>>>>> understand > >>>>>>>>>>>>> the problem that you’re trying to solve and some of the other > >>>>>>>>>>> suggestions > >>>>>>>>>>>>> in this thread seem neater. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Personally, I would add another method to KafkaProducer. Not > an > >>>>>>>>>>> overload > >>>>>>>>>>>>> on flush() because this is not flush() at all. Using > Matthias’s > >>>>>>>>>>> options, > >>>>>>>>>>>>> I prefer (3). > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>> Andrew > >>>>>>>>>>>>> > >>>>>>>>>>>>>> On 20 Jun 2024, at 15:08, Lianet M. <liane...@gmail.com> > >> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi all, thanks for the KIP Alieh! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> LM1. Totally agree with Artem's point about the config not > >> being > >>>>>> the > >>>>>>>>>>>> most > >>>>>>>>>>>>>> explicit/flexible way to express this capability. Getting > >> then to > >>>>>>>>>>>> Matthias > >>>>>>>>>>>>>> 4 options, what I don't like about 3 and 4 is that it seems > >> they > >>>>>>>>>> might > >>>>>>>>>>>> not > >>>>>>>>>>>>>> age very well? Aren't we going to be wanting some other > twist > >> to > >>>>>> the > >>>>>>>>>>>> flush > >>>>>>>>>>>>>> semantics that will have us adding yet another param to it, > or > >>>>>>>>>> another > >>>>>>>>>>>>>> overloaded method? I truly don't have the context to answer > >> that, > >>>>>>>>>> but > >>>>>>>>>>>> if it > >>>>>>>>>>>>>> feels like a realistic future maybe adding some kind > >> FlushOptions > >>>>>>>>>>>> params to > >>>>>>>>>>>>>> the flush would be better from an extensibility point of > >> view. It > >>>>>>>>>>> would > >>>>>>>>>>>>>> only have the clearErrors option available for now but could > >>>>>> accept > >>>>>>>>>>> any > >>>>>>>>>>>>>> other we may need. I find that this would remove the > >> "ugliness" > >>>>>>>>>>> Matthias > >>>>>>>>>>>>>> pointed out for 3. and 4. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> LM2. No matter how we end up expressing the different > >> semantics > >>>>>> for > >>>>>>>>>>>> flush, > >>>>>>>>>>>>>> let's make sure we update the KIP on the flush and > >>>>>> commitTransaction > >>>>>>>>>>>> java > >>>>>>>>>>>>>> docs. It currently states that flush "clears the last > >> exception" > >>>>>>>>>> and > >>>>>>>>>>>>>> commitTransaction "will NOT throw" if called after flush, > but > >> it > >>>>>>>>>>> really > >>>>>>>>>>>> all > >>>>>>>>>>>>>> depends on the config/options/method used. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> LM3. I find it would be helpful to include an example to > show > >> the > >>>>>>>>>> new > >>>>>>>>>>>> flow > >>>>>>>>>>>>>> that we're unblocking (I see this as the great gain here): > >> flush > >>>>>>>>>> with > >>>>>>>>>>>> clear > >>>>>>>>>>>>>> error option enabled -> catch and do whatever error handling > >> we > >>>>>> want > >>>>>>>>>>> -> > >>>>>>>>>>>>>> commitTransaction successfully > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Lianet > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 11:26 PM Chris Egerton < > >>>>>>>>>>> fearthecel...@gmail.com > >>>>>>>>>>>>> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi Matthias, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I like the alternatives you've listed. One more that might > >> help > >>>>>> is > >>>>>>>>>>> if, > >>>>>>>>>>>>>>> instead of overloading flush(), we overloaded > >> commitTransaction() > >>>>>>>>>> to > >>>>>>>>>>>>>>> something like commitTransaction(boolean > >> tolerateRecordErrors). > >>>>>>>>>> This > >>>>>>>>>>>> seems > >>>>>>>>>>>>>>> slightly cleaner in that it takes the behavioral change we > >> want, > >>>>>>>>>>> which > >>>>>>>>>>>> only > >>>>>>>>>>>>>>> applies to transactional producers, to an API method that > is > >> only > >>>>>>>>>>> used > >>>>>>>>>>>> for > >>>>>>>>>>>>>>> transactional producers. It would also avoid the issue of > >> whether > >>>>>>>>>> or > >>>>>>>>>>>> not > >>>>>>>>>>>>>>> flush() (or a new variant of it with altered semantics) > >> should > >>>>>>>>>> throw > >>>>>>>>>>> or > >>>>>>>>>>>>>>> not. Thoughts? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi Alieh, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks for the KIP, I like this direction a lot more than > the > >>>>>>>>>>> pluggable > >>>>>>>>>>>>>>> handler! > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I share Artem's concerns that enabling this behavior via > >>>>>>>>>>> configuration > >>>>>>>>>>>>>>> doesn't seem like a great fit. It's likely that application > >> code > >>>>>>>>>> will > >>>>>>>>>>>> be > >>>>>>>>>>>>>>> written in a style that only works with one type of > behavior > >> from > >>>>>>>>>>>>>>> transactional producers, so requiring that application code > >> to > >>>>>>>>>>> declare > >>>>>>>>>>>> its > >>>>>>>>>>>>>>> expectations for the behavior of its producer seems more > >>>>>>>>>> appropriate > >>>>>>>>>>>> than, > >>>>>>>>>>>>>>> e.g., allowing users deploying that application to tweak a > >>>>>>>>>>>> configuration > >>>>>>>>>>>>>>> file that gets fed to producers spun up inside it. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Chris > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 10:32 PM Matthias J. Sax < > >>>>>> mj...@apache.org > >>>>>>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks for the KIP Alieh. I actually like the KIP as-is, > but > >>>>>> think > >>>>>>>>>>>>>>>> Arthem raises very good points... > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Seems we have four options on how to move forward? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 1. add config to allow "silent error clearance" as the > >> KIP > >>>>>>>>>>> proposes > >>>>>>>>>>>>>>>> 2. change flush() to clear error and let it throw > >>>>>>>>>>>>>>>> 3. add new flushAndThrow()` (or better name) which > clears > >>>>>> error > >>>>>>>>>>> and > >>>>>>>>>>>>>>>> throws > >>>>>>>>>>>>>>>> 4. add `flush(boolean clearAndThrow)` and let user pick > >> (and > >>>>>>>>>>>> deprecate > >>>>>>>>>>>>>>>> existing `flush()`) > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> For (2), given that it would be a behavior change, we > might > >> also > >>>>>>>>>>> need > >>>>>>>>>>>> a > >>>>>>>>>>>>>>>> public "feature flag" config. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> It seems, both (1) and (2) have the issue Artem mentioned. > >> (3) > >>>>>> and > >>>>>>>>>>> (4) > >>>>>>>>>>>>>>>> would be safer to this end, however, for both we kinda get > >> an > >>>>>> ugly > >>>>>>>>>>>> API? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Not sure right now if I have any preference. Seems we need > >> to > >>>>>> pick > >>>>>>>>>>>> some > >>>>>>>>>>>>>>>> evil and that there is no clear best solution? Would be > >> good to > >>>>>>>>>> her > >>>>>>>>>>>> from > >>>>>>>>>>>>>>>> others what they think > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On 6/18/24 8:39 PM, Artem Livshits wrote: > >>>>>>>>>>>>>>>>> Hi Alieh, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thank you for the KIP. I have a couple of suggestions: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> AL1. We should throw an error from flush after we clear > >> it. > >>>>>>>>>> This > >>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>> make it so that both "send + commit" and "send + flush + > >>>>>> commit" > >>>>>>>>>>> (the > >>>>>>>>>>>>>>>>> latter looks like just a more verbose way to express the > >>>>>> former, > >>>>>>>>>>> and > >>>>>>>>>>>> it > >>>>>>>>>>>>>>>>> would be intuitive if it behaves the same) would throw if > >> the > >>>>>>>>>>>>>>> transaction > >>>>>>>>>>>>>>>>> has an error (so if the code is written either way it's > >> going > >>>>>> be > >>>>>>>>>>>>>>>> correct). > >>>>>>>>>>>>>>>>> At the same time, the latter could be extended by the > >> caller to > >>>>>>>>>>>>>>> intercept > >>>>>>>>>>>>>>>>> exceptions from flush, ignore as needed, and commit the > >>>>>>>>>>> transaction. > >>>>>>>>>>>>>>>> This > >>>>>>>>>>>>>>>>> solution would keep basic things simple (if someone has > >> code > >>>>>> that > >>>>>>>>>>>>>>> doesn't > >>>>>>>>>>>>>>>>> require advanced error handling, then basic "send + > flush + > >>>>>>>>>> commit" > >>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>> do the right thing) and advanced things possible, an > >>>>>> application > >>>>>>>>>>> can > >>>>>>>>>>>>>>> add > >>>>>>>>>>>>>>>>> try + catch around flush and ignore some errors. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> AL2. I'm not sure if config is the best way to express > the > >>>>>>>>>>>>>>> modification > >>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>> the "flush" semantics -- the application logic that calls > >>>>>> "flush" > >>>>>>>>>>>> needs > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> match the "flush" semantics and configuring semantics in > a > >>>>>>>>>> detached > >>>>>>>>>>>>>>> place > >>>>>>>>>>>>>>>>> creates a room for bugs due to discrepancies. This can > be > >>>>>>>>>>> especially > >>>>>>>>>>>>>>> bad > >>>>>>>>>>>>>>>>> if the producer loads configuration from a file at run > >> time, in > >>>>>>>>>>> that > >>>>>>>>>>>>>>>> case a > >>>>>>>>>>>>>>>>> mistake in configuration could break the application > >> because it > >>>>>>>>>> was > >>>>>>>>>>>>>>>> written > >>>>>>>>>>>>>>>>> to expect one "flush" semantics but the semantics is > >> switched. > >>>>>>>>>>> Given > >>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>> the "flush" semantics needs to match the caller's > >> expectation, > >>>>>> a > >>>>>>>>>>> way > >>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> accomplish that would be to pass the caller's expectation > >> to > >>>>>> the > >>>>>>>>>>>>>>> "flush" > >>>>>>>>>>>>>>>>> call by either have a method with a different name or > have > >> an > >>>>>>>>>>>> overload > >>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>> a Boolen flag that would configure the semantics (the > >> current > >>>>>>>>>>> method > >>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>> just redirect to the new one). > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> -Artem > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Mon, Jun 17, 2024 at 9:09 AM Alieh Saeedi > >>>>>>>>>>>>>>>> <asae...@confluent.io.invalid> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I'd like to kick off a discussion for KIP-1059 that > >> suggests > >>>>>>>>>>> adding > >>>>>>>>>>>> a > >>>>>>>>>>>>>>>> new > >>>>>>>>>>>>>>>>>> feature to the Producer flush() method. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1059%3A+Enable+the+Producer+flush%28%29+method+to+clear+the+latest+send%28%29+error > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>> Alieh > > >