Hi Chris, That works for me too. I slightly prefer an option on flush(), but what you suggested works too.
Thanks, Andrew > On 24 Jun 2024, at 15:14, Chris Egerton <chr...@aiven.io.INVALID> wrote: > > Hi Andrew, > > I like a lot of what you said, but I still believe it's better to override > commitTransaction than flush. Users will already have to manually opt in to > ignoring errors encountered during transactions, and we can document > recommended usage (i.e., explicitly invoking flush() before invoking > commitTransaction(ignoreRecordErrors)) in the newly-introduced method. I > don't believe it's worth the increased cognitive load on users with > non-transactional producers to introduce an overloaded flush() variant. > > Cheers, > > Chris > > On Mon, Jun 24, 2024 at 9:39 AM Andrew Schofield <andrew_schofi...@live.com> > wrote: > >> Hi Alieh, >> Thanks for driving this. Unfortunately, there are many parts of the API >> which >> are a bit unfortunate and it’s tricky to make small improvements that >> don’t have >> downsides. >> >> I don’t like the idea of using a configuration because configuration is >> often >> outside the application and changing the behaviour of someone else’s >> application >> without understanding it is risky. Anything which embeds a transactional >> producer >> could have its behaviour changed unexpectedly. >> >> It would be been much nicer if send() didn’t fail silently and change the >> transaction >> state. But, because it’s an asynchronous operation, I don’t really think >> we can >> just make it throw all exceptions, even though I really think that >> `send()` is the >> method with the problem here. >> >> The contract of `flush()` is that it makes sure that all preceding sends >> will have >> completed, so it should be true that a well written application would be >> able to >> know which records were OK because of the Future<RecordMetadata> returned >> by the `send()` method. It should be able to determine whether it wants to >> commit >> the transaction even if some of the intended operations didn’t succeed. >> >> What we don’t currently have is a way for the application to say to the >> KafkaProducer >> that it knows the outcome of sending the records and to confirm that it >> wants to proceed. >> Then it would not be necessary for `commitTransaction()` to throw an >> exception to >> report a historical error which the application might choose to ignore. >> >> Having read the comments, I think the KIP is on the right lines focusing >> on the `flush()` >> method. My suggestion is that we introduce an option on `flush()` to be >> used before >> `commitTransaction()` for applications that want to be able to commit >> transactions which >> had known failed operations. >> >> The code would be: >> >> producer.beginTransaction(); >> >> future1 = producer.send(goodRecord1); >> future2 = producer.send(badRecord); // The future from this call will >> complete exceptionally >> future3 = producer.send(goodRecord2); >> >> producer.flush(FlushOption.TRANSACTION_READY); >> >> // At this point, we know that all 3 futures are complete and the >> transaction contains 2 records >> producer.commitTransaction(); >> >> I wouldn’t deprecate `flush()` with no option. It just uses the default >> option which behaves >> like today. >> >> Why did I suggest an option on `flush()` rather than >> `commitTransaction()`? Because with >> `flush()`, it’s clear when the application is stating that it’s seen all >> of the results from its >> `send()` calls and it’s ready to proceed. If it has to rely on flushing >> that occurs inside >> `commitTransaction()`, I don’t see it’s as clear-cut. >> >> Thanks, >> Andrew >> >> >> >>> On 24 Jun 2024, at 13:44, Alieh Saeedi <asae...@confluent.io.INVALID> >> wrote: >>> >>> Hi all, >>> Thanks for the interesting discussion. >>> >>> I assume that now the main questions are as follows: >>> >>> 1. Do we need to transit the transcation to the error state for API >>> exceptions? >>> 2. Should we throw the API exception in `send()` instead of returning a >>> future error? >>> 3. If the answer to question (1) is NO and to question (2) is YES, do we >>> need to change the current `flush` or `commitTnx` at all? >>> >>> Cheers, >>> Alieh >>> >>> On Sat, Jun 22, 2024 at 3:21 AM Matthias J. Sax <mj...@apache.org> >> wrote: >>> >>>> Hey Kirk, >>>> >>>> can you elaborate on a few points? >>>> >>>>> Otherwise users would have to know to explicitly change their code to >>>> invoke flush(). >>>> >>>> Why? If we would add an option to `flush(FlushOption)`, the existing >>>> `flush()` w/o any option will still be there, right? If we would really >>>> deprecate existing `flush()`, it would just mean that we would pass >>>> "default FlushOption" into an implicit flush (and yes, we would need to >>>> define what this would be). >>>> >>>> I think there is no clear winner (as pointed out in my last reply), and >>>> both `flush(FlushOption)` and `commitTx(CommitOption)` has advantages >>>> and drawbacks. Guess we need to just agree on which tradeoff we want to >>>> move forward with? >>>> >>>> >>>> Not sure if your database example is a 1:1 fit? I think, the better >>>> comparison would be: >>>> >>>> BEGIN TX; >>>> INSERT INTO foo VALUES (’a’); >>>> INSERT INTO foo VALUES (’b’); >>>> INSERT INTO foo VALUES (’c’); >>>> INSERT INTO foo VALUES (’not sure’); >>>> >>>> For this case, the full TX would roll back, right? I still think that >>>> allowing users to just skip over the last error, and continue the TX >>>> would be ok. In the end, we provide a programmatic API, and not a >>>> declarative one as SQL. Of course, default behavior would still be to >>>> put the producer into error state, and the user would need to call >>>> `abortTransaction()` to move forward. >>>> >>>> >>>> -Matthias >>>> >>>> On 6/21/24 5:26 PM, Kirk True wrote: >>>>> Hi Matthias, >>>>> >>>>>> On Jun 21, 2024, at 12:28 PM, Matthias J. Sax <mj...@apache.org> >> wrote: >>>>>> >>>>>> If we want to limit it to `RecordTooLargeException` throwing from >>>> `send()` directly make sense. Thanks for calling it out. >>>>>> >>>>>> It's still a question of backward compatibility? `send()` does throw >>>> exceptions already, including generic `KafkaException`. Not sure if this >>>> helps with backward compatibility? Could we just add a new exception >> type >>>> (which is a child of `KafkaException`)? >>>>>> >>>>>> The Producer JavaDocs are not totally explicit about it IMHO. >>>>>> >>>>>> I think we could expect that some generic error handling path gets >>>> executed. For the TX-case, I would assume that a TX would be aborted if >>>> `send()` throws or that the producer would be `closed()`. Overall this >>>> might be safe? >>>>>> >>>>>>> It would be a little less flexible >>>>>>>> though, since (as you note) it would still be impossible to commit >>>>>>>> transactions after errors have been reported from brokers. >>>>>> >>>>>> KS would still need a way to clear the error state of the producer. We >>>> could catch a `RecordTooLargeException` from `send()`, call the handler >> and >>>> let it decide what to do next. But if it does return `CONTINUE` to >> swallow >>>> the error and drop the poison pill record on the floor, we would want to >>>> move forward and commit the transaction. >>>>>> >>>>>> But the question is: if we cannot add a record to the tx, does the >>>> producer need to go into error state? In the end, we did throw and >> inform >>>> the app that the record was _not_ added, and it's up to the app to >> decide >>>> what to do next? >>>>> >>>>> That’s an excellent question… >>>>> >>>>> Imagine the user’s application is writing information to a database >>>> instead of Kafka. If there’s a table with a CHAR(1) column and this SQL >>>> statement was attempted, what should happen? >>>>> >>>>> INSERT INTO foo VALUES (’not sure’); >>>>> >>>>> Yes, that DML would fail, sure, but would the user expect that the >>>> connection used by database library would get stuck in some kind of >> error >>>> state? A user would be able catch the error and either continue or >> abort, >>>> based on their business rules. >>>>> >>>>> So I agree with what I believe you’re implying: we shouldn’t poison the >>>> Producer/TransactionManager on certain types of application-level >> errors in >>>> send(). >>>>> >>>>> Kirk >>>>> >>>>>> If we report the error only via the `Callback` it's a different story, >>>> because the contract for this case is clearly specified on the JavaDocs: >>>>>> >>>>>>> When used as part of a transaction, it is not necessary to define a >>>> callback or check the result of the future >>>>>>> in order to detect errors from <code>send</code>. If any of the send >>>> calls failed with an irrecoverable error, >>>>>>> the final {@link #commitTransaction()} call will fail and throw the >>>> exception from the last failed send. When >>>>>>> this happens, your application should call {@link >> #abortTransaction()} >>>> to reset the state and continue to send >>>>>>> data. >>>>>> >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> >>>>>> On 6/21/24 11:42 AM, Chris Egerton wrote: >>>>>>> Hi Artem, >>>>>>> I think it'd make sense to throw directly from send whenever >> possible, >>>>>>> instead of returning an already-completed future. I didn't do that in >>>> my >>>>>>> bug fix to try to be conservative about breaking changes but this >>>> seems to >>>>>>> have caused its own set of headaches. It would be a little less >>>> flexible >>>>>>> though, since (as you note) it would still be impossible to commit >>>>>>> transactions after errors have been reported from brokers. >>>>>>> I'll leave it up to the Kafka Streams folks to decide if that >>>> flexibility >>>>>>> is required. If it is, then users could explicitly call flush() >> before >>>>>>> committing (and ignoring errors for) or aborting a transaction, if >> they >>>>>>> want to implement fine-grained error handling logic such as allowing >>>> errors >>>>>>> for a subset of topics to be ignored. >>>>>>> Hi Matthias, >>>>>>> Most of the time you're right and we can't throw from send(); >> however, >>>> in >>>>>>> this case (client-side record-too-large exception), the error is >>>> actually >>>>>>> noticed by the producer before send() returns, so it should be >>>> possible to >>>>>>> throw directly. >>>>>>> Cheers, >>>>>>> Chris >>>>>>> On Fri, Jun 21, 2024, 14:25 Matthias J. Sax <mj...@apache.org> >> wrote: >>>>>>>> Not sure if we can change send and make it throw, given that send() >> is >>>>>>>> async? That is why users can register a `Callback` to begin with, >>>> right? >>>>>>>> >>>>>>>> And Alieh's point about backward compatibility is also a fair >> concern. >>>>>>>> >>>>>>>> >>>>>>>>> Actually, this would potentially be even >>>>>>>>> worse than the original buggy behavior because the bug was that we >>>>>>>> ignored >>>>>>>>> errors that happened in the "send()" method itself, not necessarily >>>> the >>>>>>>>> ones that we got from the broker. >>>>>>>> >>>>>>>> My understanding was that `commitTx(swallowError)` would only >> swallow >>>>>>>> `send()` errors, not errors about the actually commit. I agree that >> it >>>>>>>> would be very bad to swallow errors about the actual tx commit... >>>>>>>> >>>>>>>> It's a fair question if this might be too subtle; to make it >> explicit, >>>>>>>> we could use `CommitOpions#ignorePendingSendErors()` [working name] >> to >>>>>>>> make it clear. >>>>>>>> >>>>>>>> >>>>>>>> If we think it's too subtle to change commit to swallow send() >> errors, >>>>>>>> maybe going with `flush(FlushOptions)` would be clearer (and we can >>>> use >>>>>>>> `FlushOption#swallowSendErrorsForTransactions()` [working name] to >> be >>>>>>>> explicitly that the `FlushOption` for now has only an effect for >> TX). >>>>>>>> >>>>>>>> >>>>>>>> Thoughts? >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On 6/21/24 4:10 AM, Alieh Saeedi wrote: >>>>>>>>> Hi all, >>>>>>>>> >>>>>>>>> >>>>>>>>> It is very exciting to see all the experts here raising very good >>>> points. >>>>>>>>> >>>>>>>>> As we go further, we see more and more options to improve our >>>> solution, >>>>>>>>> which makes concluding and updating the KIP impossible. >>>>>>>>> >>>>>>>>> >>>>>>>>> The main suggestions so far are: >>>>>>>>> >>>>>>>>> 1. `flush` with `flushOptions` as input parameter >>>>>>>>> >>>>>>>>> 2. `commitTx` with `commitOptions` as input parameter >>>>>>>>> >>>>>>>>> 3. `send` must throw the exception >>>>>>>>> >>>>>>>>> >>>>>>>>> My concern about the 3rd suggestion: >>>>>>>>> >>>>>>>>> 1. Does the change cause any issue with backward compatibility? >>>>>>>>> >>>>>>>>> 2. The `send (bad record)` already transits the transaction to the >>>> error >>>>>>>>> state. No user, including Streams is able to transit the >> transaction >>>> back >>>>>>>>> from the error state. Do you mean we remove the >>>>>>>>> `maybeTransitionToErrorState(e)` from here >>>>>>>>> < >>>>>>>> >>>> >> https://github.com/apache/kafka/blob/9b5b434e2a6b2d5290ea403fc02859b1c523d8aa/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1112 >>>>>>>>> >>>>>>>>> as well? >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Alieh >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Jun 21, 2024 at 8:45 AM Andrew Schofield < >>>>>>>> andrew_schofi...@live.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Artem, >>>>>>>>>> I think you make a good point which is worth further >> consideration. >>>> If >>>>>>>>>> any of the existing methods is really ripe for a change here, it’s >>>> the >>>>>>>>>> send() that actually caused the problem. If that can be fixed so >>>> there >>>>>>>> are >>>>>>>>>> no situations in which a lurking error breaks a transaction, that >>>> might >>>>>>>> be >>>>>>>>>> the best. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Andrew >>>>>>>>>> >>>>>>>>>>> On 21 Jun 2024, at 01:51, Artem Livshits <alivsh...@confluent.io >>>>>>>> .INVALID> >>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> I thought we still wait for requests (and their errors) to come >>>> in and >>>>>>>>>>> could handle fatal errors appropriately. >>>>>>>>>>> >>>>>>>>>>> We do wait for requests, but my understanding is that when >>>>>>>>>>> commitTransaction("ignore send errors") we want to ignore errors. >>>> So >>>>>>>> if >>>>>>>>>> we >>>>>>>>>>> do >>>>>>>>>>> >>>>>>>>>>> 1. send >>>>>>>>>>> 2. commitTransaction("ignore send errors") >>>>>>>>>>> >>>>>>>>>>> the commit will succeed. You can look at the example in >>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 and just >>>> substitute >>>>>>>>>>> commitTransaction with commitTransaction("ignore send errors") >> and >>>> we >>>>>>>> get >>>>>>>>>>> the buggy behavior back :-). Actually, this would potentially be >>>> even >>>>>>>>>>> worse than the original buggy behavior because the bug was that >> we >>>>>>>>>> ignored >>>>>>>>>>> errors that happened in the "send()" method itself, not >>>> necessarily the >>>>>>>>>>> ones that we got from the broker. >>>>>>>>>>> >>>>>>>>>>> Actually, looking at >>>> https://github.com/apache/kafka/pull/11508/files, >>>>>>>>>>> wouldn't a better solution be to just throw the error from the >>>> "send" >>>>>>>>>>> method itself, rather than trying to set it to be thrown during >>>> commit? >>>>>>>>>>> This way the example in >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 >>>>>>>>>>> would be fixed, and at the same time it would give an opportunity >>>> for >>>>>>>> KS >>>>>>>>>> to >>>>>>>>>>> catch the error and ignore it if needed. Not sure if we need a >>>> KIP for >>>>>>>>>>> that, just do a better fix of the old bug. >>>>>>>>>>> >>>>>>>>>>> -Artem >>>>>>>>>>> >>>>>>>>>>> On Thu, Jun 20, 2024 at 4:58 PM Justine Olshan >>>>>>>>>> <jols...@confluent.io.invalid> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> I'm a bit late to the party, but the discussion here looks >>>> reasonable. >>>>>>>>>>>> Moving the logic to a transactional method makes sense to me and >>>> makes >>>>>>>>>> me >>>>>>>>>>>> feel a bit better about keeping the complexity in the methods >>>> relevant >>>>>>>>>> to >>>>>>>>>>>> the issue. >>>>>>>>>>>> >>>>>>>>>>>>> One minor concern is that if we set "ignore send >>>>>>>>>>>> errors" (or whatever we decide to name it) option without >> explicit >>>>>>>>>> flush, >>>>>>>>>>>> it'll actually lead to broken behavior as the application won't >> be >>>>>>>> able >>>>>>>>>> to >>>>>>>>>>>> stop a commit from proceeding even on fatal errors. >>>>>>>>>>>> >>>>>>>>>>>> Is this with respect to the case a request is still inflight >> when >>>> we >>>>>>>>>> call >>>>>>>>>>>> commitTransaction? I thought we still wait for requests (and >> their >>>>>>>>>> errors) >>>>>>>>>>>> to come in and could handle fatal errors appropriately. >>>>>>>>>>>> >>>>>>>>>>>> Justine >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Jun 20, 2024 at 4:32 PM Artem Livshits >>>>>>>>>>>> <alivsh...@confluent.io.invalid> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Matthias (and other folks who suggested ideas), >>>>>>>>>>>>> >>>>>>>>>>>>>> maybe `commitTransaction(CommitOptions)` or similar could be a >>>> good >>>>>>>>>>>> way >>>>>>>>>>>>> forward? >>>>>>>>>>>>> >>>>>>>>>>>>> I like this approach. One minor concern is that if we set >>>> "ignore >>>>>>>> send >>>>>>>>>>>>> errors" (or whatever we decide to name it) option without >>>> explicit >>>>>>>>>> flush, >>>>>>>>>>>>> it'll actually lead to broken behavior as the application won't >>>> be >>>>>>>> able >>>>>>>>>>>> to >>>>>>>>>>>>> stop a commit from proceeding even on fatal errors. But I >> guess >>>>>>>> we'll >>>>>>>>>>>> just >>>>>>>>>>>>> have to clearly document it. >>>>>>>>>>>>> >>>>>>>>>>>>> In some way we are basically adding a flag to optionally >> restore >>>> the >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 bug, which is >>>> the >>>>>>>>>>>>> motivation for all these changes, anyway :-). >>>>>>>>>>>>> >>>>>>>>>>>>> -Artem >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Jun 20, 2024 at 2:18 PM Matthias J. Sax < >>>> mj...@apache.org> >>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Seems the option to use a config does not get a lot of >> support. >>>>>>>>>>>>>> >>>>>>>>>>>>>> So we need to go with some form or "overload / new method". I >>>> think >>>>>>>>>>>>>> Chris' point about not coupling it to `flush()` but rather >>>>>>>>>>>>>> `commitTransaction()` is actually a very good one; for non-tx >>>> case, >>>>>>>>>> the >>>>>>>>>>>>>> different flush variants would not make sense. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I also like Lianet's idea to pass in some "options" object, so >>>> maybe >>>>>>>>>>>>>> `commitTransaction(CommitOptions)` or similar could be a good >>>> way >>>>>>>>>>>>>> forward? It's much better than a `boolean` parameter, >>>> aesthetically, >>>>>>>>>> as >>>>>>>>>>>>>> we as extendable in the future if necessary. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Given that we would pass in an optional parameter, we might >> not >>>> even >>>>>>>>>>>>>> need to deprecate the existing `commitTransaction()` method? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 6/20/24 9:12 AM, Andrew Schofield wrote: >>>>>>>>>>>>>>> Hi Alieh, >>>>>>>>>>>>>>> Thanks for the KIP. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I *really* don’t like adding a config which changes the >>>> behaviour >>>>>>>> of >>>>>>>>>>>>> the >>>>>>>>>>>>>>> flush() method. We already have too many configs. But I >> totally >>>>>>>>>>>>>> understand >>>>>>>>>>>>>>> the problem that you’re trying to solve and some of the other >>>>>>>>>>>>> suggestions >>>>>>>>>>>>>>> in this thread seem neater. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Personally, I would add another method to KafkaProducer. Not >> an >>>>>>>>>>>>> overload >>>>>>>>>>>>>>> on flush() because this is not flush() at all. Using >> Matthias’s >>>>>>>>>>>>> options, >>>>>>>>>>>>>>> I prefer (3). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> Andrew >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 20 Jun 2024, at 15:08, Lianet M. <liane...@gmail.com> >>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi all, thanks for the KIP Alieh! >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> LM1. Totally agree with Artem's point about the config not >>>> being >>>>>>>> the >>>>>>>>>>>>>> most >>>>>>>>>>>>>>>> explicit/flexible way to express this capability. Getting >>>> then to >>>>>>>>>>>>>> Matthias >>>>>>>>>>>>>>>> 4 options, what I don't like about 3 and 4 is that it seems >>>> they >>>>>>>>>>>> might >>>>>>>>>>>>>> not >>>>>>>>>>>>>>>> age very well? Aren't we going to be wanting some other >> twist >>>> to >>>>>>>> the >>>>>>>>>>>>>> flush >>>>>>>>>>>>>>>> semantics that will have us adding yet another param to it, >> or >>>>>>>>>>>> another >>>>>>>>>>>>>>>> overloaded method? I truly don't have the context to answer >>>> that, >>>>>>>>>>>> but >>>>>>>>>>>>>> if it >>>>>>>>>>>>>>>> feels like a realistic future maybe adding some kind >>>> FlushOptions >>>>>>>>>>>>>> params to >>>>>>>>>>>>>>>> the flush would be better from an extensibility point of >>>> view. It >>>>>>>>>>>>> would >>>>>>>>>>>>>>>> only have the clearErrors option available for now but could >>>>>>>> accept >>>>>>>>>>>>> any >>>>>>>>>>>>>>>> other we may need. I find that this would remove the >>>> "ugliness" >>>>>>>>>>>>> Matthias >>>>>>>>>>>>>>>> pointed out for 3. and 4. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> LM2. No matter how we end up expressing the different >>>> semantics >>>>>>>> for >>>>>>>>>>>>>> flush, >>>>>>>>>>>>>>>> let's make sure we update the KIP on the flush and >>>>>>>> commitTransaction >>>>>>>>>>>>>> java >>>>>>>>>>>>>>>> docs. It currently states that flush "clears the last >>>> exception" >>>>>>>>>>>> and >>>>>>>>>>>>>>>> commitTransaction "will NOT throw" if called after flush, >> but >>>> it >>>>>>>>>>>>> really >>>>>>>>>>>>>> all >>>>>>>>>>>>>>>> depends on the config/options/method used. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> LM3. I find it would be helpful to include an example to >> show >>>> the >>>>>>>>>>>> new >>>>>>>>>>>>>> flow >>>>>>>>>>>>>>>> that we're unblocking (I see this as the great gain here): >>>> flush >>>>>>>>>>>> with >>>>>>>>>>>>>> clear >>>>>>>>>>>>>>>> error option enabled -> catch and do whatever error handling >>>> we >>>>>>>> want >>>>>>>>>>>>> -> >>>>>>>>>>>>>>>> commitTransaction successfully >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks! >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Lianet >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 11:26 PM Chris Egerton < >>>>>>>>>>>>> fearthecel...@gmail.com >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I like the alternatives you've listed. One more that might >>>> help >>>>>>>> is >>>>>>>>>>>>> if, >>>>>>>>>>>>>>>>> instead of overloading flush(), we overloaded >>>> commitTransaction() >>>>>>>>>>>> to >>>>>>>>>>>>>>>>> something like commitTransaction(boolean >>>> tolerateRecordErrors). >>>>>>>>>>>> This >>>>>>>>>>>>>> seems >>>>>>>>>>>>>>>>> slightly cleaner in that it takes the behavioral change we >>>> want, >>>>>>>>>>>>> which >>>>>>>>>>>>>> only >>>>>>>>>>>>>>>>> applies to transactional producers, to an API method that >> is >>>> only >>>>>>>>>>>>> used >>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>> transactional producers. It would also avoid the issue of >>>> whether >>>>>>>>>>>> or >>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>> flush() (or a new variant of it with altered semantics) >>>> should >>>>>>>>>>>> throw >>>>>>>>>>>>> or >>>>>>>>>>>>>>>>> not. Thoughts? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Alieh, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for the KIP, I like this direction a lot more than >> the >>>>>>>>>>>>> pluggable >>>>>>>>>>>>>>>>> handler! >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I share Artem's concerns that enabling this behavior via >>>>>>>>>>>>> configuration >>>>>>>>>>>>>>>>> doesn't seem like a great fit. It's likely that application >>>> code >>>>>>>>>>>> will >>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>> written in a style that only works with one type of >> behavior >>>> from >>>>>>>>>>>>>>>>> transactional producers, so requiring that application code >>>> to >>>>>>>>>>>>> declare >>>>>>>>>>>>>> its >>>>>>>>>>>>>>>>> expectations for the behavior of its producer seems more >>>>>>>>>>>> appropriate >>>>>>>>>>>>>> than, >>>>>>>>>>>>>>>>> e.g., allowing users deploying that application to tweak a >>>>>>>>>>>>>> configuration >>>>>>>>>>>>>>>>> file that gets fed to producers spun up inside it. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Chris >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 10:32 PM Matthias J. Sax < >>>>>>>> mj...@apache.org >>>>>>>>>>>>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for the KIP Alieh. I actually like the KIP as-is, >> but >>>>>>>> think >>>>>>>>>>>>>>>>>> Arthem raises very good points... >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Seems we have four options on how to move forward? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 1. add config to allow "silent error clearance" as the >>>> KIP >>>>>>>>>>>>> proposes >>>>>>>>>>>>>>>>>> 2. change flush() to clear error and let it throw >>>>>>>>>>>>>>>>>> 3. add new flushAndThrow()` (or better name) which >> clears >>>>>>>> error >>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>> throws >>>>>>>>>>>>>>>>>> 4. add `flush(boolean clearAndThrow)` and let user pick >>>> (and >>>>>>>>>>>>>> deprecate >>>>>>>>>>>>>>>>>> existing `flush()`) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> For (2), given that it would be a behavior change, we >> might >>>> also >>>>>>>>>>>>> need >>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>> public "feature flag" config. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> It seems, both (1) and (2) have the issue Artem mentioned. >>>> (3) >>>>>>>> and >>>>>>>>>>>>> (4) >>>>>>>>>>>>>>>>>> would be safer to this end, however, for both we kinda get >>>> an >>>>>>>> ugly >>>>>>>>>>>>>> API? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Not sure right now if I have any preference. Seems we need >>>> to >>>>>>>> pick >>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>> evil and that there is no clear best solution? Would be >>>> good to >>>>>>>>>>>> her >>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>> others what they think >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On 6/18/24 8:39 PM, Artem Livshits wrote: >>>>>>>>>>>>>>>>>>> Hi Alieh, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thank you for the KIP. I have a couple of suggestions: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> AL1. We should throw an error from flush after we clear >>>> it. >>>>>>>>>>>> This >>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>> make it so that both "send + commit" and "send + flush + >>>>>>>> commit" >>>>>>>>>>>>> (the >>>>>>>>>>>>>>>>>>> latter looks like just a more verbose way to express the >>>>>>>> former, >>>>>>>>>>>>> and >>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>> would be intuitive if it behaves the same) would throw if >>>> the >>>>>>>>>>>>>>>>> transaction >>>>>>>>>>>>>>>>>>> has an error (so if the code is written either way it's >>>> going >>>>>>>> be >>>>>>>>>>>>>>>>>> correct). >>>>>>>>>>>>>>>>>>> At the same time, the latter could be extended by the >>>> caller to >>>>>>>>>>>>>>>>> intercept >>>>>>>>>>>>>>>>>>> exceptions from flush, ignore as needed, and commit the >>>>>>>>>>>>> transaction. >>>>>>>>>>>>>>>>>> This >>>>>>>>>>>>>>>>>>> solution would keep basic things simple (if someone has >>>> code >>>>>>>> that >>>>>>>>>>>>>>>>> doesn't >>>>>>>>>>>>>>>>>>> require advanced error handling, then basic "send + >> flush + >>>>>>>>>>>> commit" >>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>> do the right thing) and advanced things possible, an >>>>>>>> application >>>>>>>>>>>>> can >>>>>>>>>>>>>>>>> add >>>>>>>>>>>>>>>>>>> try + catch around flush and ignore some errors. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> AL2. I'm not sure if config is the best way to express >> the >>>>>>>>>>>>>>>>> modification >>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>> the "flush" semantics -- the application logic that calls >>>>>>>> "flush" >>>>>>>>>>>>>> needs >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> match the "flush" semantics and configuring semantics in >> a >>>>>>>>>>>> detached >>>>>>>>>>>>>>>>> place >>>>>>>>>>>>>>>>>>> creates a room for bugs due to discrepancies. This can >> be >>>>>>>>>>>>> especially >>>>>>>>>>>>>>>>> bad >>>>>>>>>>>>>>>>>>> if the producer loads configuration from a file at run >>>> time, in >>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>> case a >>>>>>>>>>>>>>>>>>> mistake in configuration could break the application >>>> because it >>>>>>>>>>>> was >>>>>>>>>>>>>>>>>> written >>>>>>>>>>>>>>>>>>> to expect one "flush" semantics but the semantics is >>>> switched. >>>>>>>>>>>>> Given >>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>> the "flush" semantics needs to match the caller's >>>> expectation, >>>>>>>> a >>>>>>>>>>>>> way >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> accomplish that would be to pass the caller's expectation >>>> to >>>>>>>> the >>>>>>>>>>>>>>>>> "flush" >>>>>>>>>>>>>>>>>>> call by either have a method with a different name or >> have >>>> an >>>>>>>>>>>>>> overload >>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>> a Boolen flag that would configure the semantics (the >>>> current >>>>>>>>>>>>> method >>>>>>>>>>>>>>>>>> could >>>>>>>>>>>>>>>>>>> just redirect to the new one). >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> -Artem >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Mon, Jun 17, 2024 at 9:09 AM Alieh Saeedi >>>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I'd like to kick off a discussion for KIP-1059 that >>>> suggests >>>>>>>>>>>>> adding >>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>> feature to the Producer flush() method. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1059%3A+Enable+the+Producer+flush%28%29+method+to+clear+the+latest+send%28%29+error >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>> Alieh >> >> >>