Hi Matthias, > On Jun 21, 2024, at 12:28 PM, Matthias J. Sax <mj...@apache.org> wrote: > > If we want to limit it to `RecordTooLargeException` throwing from `send()` > directly make sense. Thanks for calling it out. > > It's still a question of backward compatibility? `send()` does throw > exceptions already, including generic `KafkaException`. Not sure if this > helps with backward compatibility? Could we just add a new exception type > (which is a child of `KafkaException`)? > > The Producer JavaDocs are not totally explicit about it IMHO. > > I think we could expect that some generic error handling path gets executed. > For the TX-case, I would assume that a TX would be aborted if `send()` throws > or that the producer would be `closed()`. Overall this might be safe? > >> It would be a little less flexible >>> though, since (as you note) it would still be impossible to commit >>> transactions after errors have been reported from brokers. > > KS would still need a way to clear the error state of the producer. We could > catch a `RecordTooLargeException` from `send()`, call the handler and let it > decide what to do next. But if it does return `CONTINUE` to swallow the error > and drop the poison pill record on the floor, we would want to move forward > and commit the transaction. > > But the question is: if we cannot add a record to the tx, does the producer > need to go into error state? In the end, we did throw and inform the app that > the record was _not_ added, and it's up to the app to decide what to do next?
That’s an excellent question… Imagine the user’s application is writing information to a database instead of Kafka. If there’s a table with a CHAR(1) column and this SQL statement was attempted, what should happen? INSERT INTO foo VALUES (’not sure’); Yes, that DML would fail, sure, but would the user expect that the connection used by database library would get stuck in some kind of error state? A user would be able catch the error and either continue or abort, based on their business rules. So I agree with what I believe you’re implying: we shouldn’t poison the Producer/TransactionManager on certain types of application-level errors in send(). Kirk > If we report the error only via the `Callback` it's a different story, > because the contract for this case is clearly specified on the JavaDocs: > >> When used as part of a transaction, it is not necessary to define a callback >> or check the result of the future >> in order to detect errors from <code>send</code>. If any of the send calls >> failed with an irrecoverable error, >> the final {@link #commitTransaction()} call will fail and throw the >> exception from the last failed send. When >> this happens, your application should call {@link #abortTransaction()} to >> reset the state and continue to send >> data. > > > > -Matthias > > > On 6/21/24 11:42 AM, Chris Egerton wrote: >> Hi Artem, >> I think it'd make sense to throw directly from send whenever possible, >> instead of returning an already-completed future. I didn't do that in my >> bug fix to try to be conservative about breaking changes but this seems to >> have caused its own set of headaches. It would be a little less flexible >> though, since (as you note) it would still be impossible to commit >> transactions after errors have been reported from brokers. >> I'll leave it up to the Kafka Streams folks to decide if that flexibility >> is required. If it is, then users could explicitly call flush() before >> committing (and ignoring errors for) or aborting a transaction, if they >> want to implement fine-grained error handling logic such as allowing errors >> for a subset of topics to be ignored. >> Hi Matthias, >> Most of the time you're right and we can't throw from send(); however, in >> this case (client-side record-too-large exception), the error is actually >> noticed by the producer before send() returns, so it should be possible to >> throw directly. >> Cheers, >> Chris >> On Fri, Jun 21, 2024, 14:25 Matthias J. Sax <mj...@apache.org> wrote: >>> Not sure if we can change send and make it throw, given that send() is >>> async? That is why users can register a `Callback` to begin with, right? >>> >>> And Alieh's point about backward compatibility is also a fair concern. >>> >>> >>>> Actually, this would potentially be even >>>> worse than the original buggy behavior because the bug was that we >>> ignored >>>> errors that happened in the "send()" method itself, not necessarily the >>>> ones that we got from the broker. >>> >>> My understanding was that `commitTx(swallowError)` would only swallow >>> `send()` errors, not errors about the actually commit. I agree that it >>> would be very bad to swallow errors about the actual tx commit... >>> >>> It's a fair question if this might be too subtle; to make it explicit, >>> we could use `CommitOpions#ignorePendingSendErors()` [working name] to >>> make it clear. >>> >>> >>> If we think it's too subtle to change commit to swallow send() errors, >>> maybe going with `flush(FlushOptions)` would be clearer (and we can use >>> `FlushOption#swallowSendErrorsForTransactions()` [working name] to be >>> explicitly that the `FlushOption` for now has only an effect for TX). >>> >>> >>> Thoughts? >>> >>> >>> -Matthias >>> >>> >>> >>> On 6/21/24 4:10 AM, Alieh Saeedi wrote: >>>> Hi all, >>>> >>>> >>>> It is very exciting to see all the experts here raising very good points. >>>> >>>> As we go further, we see more and more options to improve our solution, >>>> which makes concluding and updating the KIP impossible. >>>> >>>> >>>> The main suggestions so far are: >>>> >>>> 1. `flush` with `flushOptions` as input parameter >>>> >>>> 2. `commitTx` with `commitOptions` as input parameter >>>> >>>> 3. `send` must throw the exception >>>> >>>> >>>> My concern about the 3rd suggestion: >>>> >>>> 1. Does the change cause any issue with backward compatibility? >>>> >>>> 2. The `send (bad record)` already transits the transaction to the error >>>> state. No user, including Streams is able to transit the transaction back >>>> from the error state. Do you mean we remove the >>>> `maybeTransitionToErrorState(e)` from here >>>> < >>> https://github.com/apache/kafka/blob/9b5b434e2a6b2d5290ea403fc02859b1c523d8aa/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1112 >>>> >>>> as well? >>>> >>>> Cheers, >>>> Alieh >>>> >>>> >>>> On Fri, Jun 21, 2024 at 8:45 AM Andrew Schofield < >>> andrew_schofi...@live.com> >>>> wrote: >>>> >>>>> Hi Artem, >>>>> I think you make a good point which is worth further consideration. If >>>>> any of the existing methods is really ripe for a change here, it’s the >>>>> send() that actually caused the problem. If that can be fixed so there >>> are >>>>> no situations in which a lurking error breaks a transaction, that might >>> be >>>>> the best. >>>>> >>>>> Thanks, >>>>> Andrew >>>>> >>>>>> On 21 Jun 2024, at 01:51, Artem Livshits <alivsh...@confluent.io >>> .INVALID> >>>>> wrote: >>>>>> >>>>>>> I thought we still wait for requests (and their errors) to come in and >>>>>> could handle fatal errors appropriately. >>>>>> >>>>>> We do wait for requests, but my understanding is that when >>>>>> commitTransaction("ignore send errors") we want to ignore errors. So >>> if >>>>> we >>>>>> do >>>>>> >>>>>> 1. send >>>>>> 2. commitTransaction("ignore send errors") >>>>>> >>>>>> the commit will succeed. You can look at the example in >>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 and just substitute >>>>>> commitTransaction with commitTransaction("ignore send errors") and we >>> get >>>>>> the buggy behavior back :-). Actually, this would potentially be even >>>>>> worse than the original buggy behavior because the bug was that we >>>>> ignored >>>>>> errors that happened in the "send()" method itself, not necessarily the >>>>>> ones that we got from the broker. >>>>>> >>>>>> Actually, looking at https://github.com/apache/kafka/pull/11508/files, >>>>>> wouldn't a better solution be to just throw the error from the "send" >>>>>> method itself, rather than trying to set it to be thrown during commit? >>>>>> This way the example in >>> https://issues.apache.org/jira/browse/KAFKA-9279 >>>>>> would be fixed, and at the same time it would give an opportunity for >>> KS >>>>> to >>>>>> catch the error and ignore it if needed. Not sure if we need a KIP for >>>>>> that, just do a better fix of the old bug. >>>>>> >>>>>> -Artem >>>>>> >>>>>> On Thu, Jun 20, 2024 at 4:58 PM Justine Olshan >>>>> <jols...@confluent.io.invalid> >>>>>> wrote: >>>>>> >>>>>>> I'm a bit late to the party, but the discussion here looks reasonable. >>>>>>> Moving the logic to a transactional method makes sense to me and makes >>>>> me >>>>>>> feel a bit better about keeping the complexity in the methods relevant >>>>> to >>>>>>> the issue. >>>>>>> >>>>>>>> One minor concern is that if we set "ignore send >>>>>>> errors" (or whatever we decide to name it) option without explicit >>>>> flush, >>>>>>> it'll actually lead to broken behavior as the application won't be >>> able >>>>> to >>>>>>> stop a commit from proceeding even on fatal errors. >>>>>>> >>>>>>> Is this with respect to the case a request is still inflight when we >>>>> call >>>>>>> commitTransaction? I thought we still wait for requests (and their >>>>> errors) >>>>>>> to come in and could handle fatal errors appropriately. >>>>>>> >>>>>>> Justine >>>>>>> >>>>>>> On Thu, Jun 20, 2024 at 4:32 PM Artem Livshits >>>>>>> <alivsh...@confluent.io.invalid> wrote: >>>>>>> >>>>>>>> Hi Matthias (and other folks who suggested ideas), >>>>>>>> >>>>>>>>> maybe `commitTransaction(CommitOptions)` or similar could be a good >>>>>>> way >>>>>>>> forward? >>>>>>>> >>>>>>>> I like this approach. One minor concern is that if we set "ignore >>> send >>>>>>>> errors" (or whatever we decide to name it) option without explicit >>>>> flush, >>>>>>>> it'll actually lead to broken behavior as the application won't be >>> able >>>>>>> to >>>>>>>> stop a commit from proceeding even on fatal errors. But I guess >>> we'll >>>>>>> just >>>>>>>> have to clearly document it. >>>>>>>> >>>>>>>> In some way we are basically adding a flag to optionally restore the >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 bug, which is the >>>>>>>> motivation for all these changes, anyway :-). >>>>>>>> >>>>>>>> -Artem >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jun 20, 2024 at 2:18 PM Matthias J. Sax <mj...@apache.org> >>>>>>> wrote: >>>>>>>> >>>>>>>>> Seems the option to use a config does not get a lot of support. >>>>>>>>> >>>>>>>>> So we need to go with some form or "overload / new method". I think >>>>>>>>> Chris' point about not coupling it to `flush()` but rather >>>>>>>>> `commitTransaction()` is actually a very good one; for non-tx case, >>>>> the >>>>>>>>> different flush variants would not make sense. >>>>>>>>> >>>>>>>>> I also like Lianet's idea to pass in some "options" object, so maybe >>>>>>>>> `commitTransaction(CommitOptions)` or similar could be a good way >>>>>>>>> forward? It's much better than a `boolean` parameter, aesthetically, >>>>> as >>>>>>>>> we as extendable in the future if necessary. >>>>>>>>> >>>>>>>>> Given that we would pass in an optional parameter, we might not even >>>>>>>>> need to deprecate the existing `commitTransaction()` method? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> On 6/20/24 9:12 AM, Andrew Schofield wrote: >>>>>>>>>> Hi Alieh, >>>>>>>>>> Thanks for the KIP. >>>>>>>>>> >>>>>>>>>> I *really* don’t like adding a config which changes the behaviour >>> of >>>>>>>> the >>>>>>>>>> flush() method. We already have too many configs. But I totally >>>>>>>>> understand >>>>>>>>>> the problem that you’re trying to solve and some of the other >>>>>>>> suggestions >>>>>>>>>> in this thread seem neater. >>>>>>>>>> >>>>>>>>>> Personally, I would add another method to KafkaProducer. Not an >>>>>>>> overload >>>>>>>>>> on flush() because this is not flush() at all. Using Matthias’s >>>>>>>> options, >>>>>>>>>> I prefer (3). >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Andrew >>>>>>>>>> >>>>>>>>>>> On 20 Jun 2024, at 15:08, Lianet M. <liane...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi all, thanks for the KIP Alieh! >>>>>>>>>>> >>>>>>>>>>> LM1. Totally agree with Artem's point about the config not being >>> the >>>>>>>>> most >>>>>>>>>>> explicit/flexible way to express this capability. Getting then to >>>>>>>>> Matthias >>>>>>>>>>> 4 options, what I don't like about 3 and 4 is that it seems they >>>>>>> might >>>>>>>>> not >>>>>>>>>>> age very well? Aren't we going to be wanting some other twist to >>> the >>>>>>>>> flush >>>>>>>>>>> semantics that will have us adding yet another param to it, or >>>>>>> another >>>>>>>>>>> overloaded method? I truly don't have the context to answer that, >>>>>>> but >>>>>>>>> if it >>>>>>>>>>> feels like a realistic future maybe adding some kind FlushOptions >>>>>>>>> params to >>>>>>>>>>> the flush would be better from an extensibility point of view. It >>>>>>>> would >>>>>>>>>>> only have the clearErrors option available for now but could >>> accept >>>>>>>> any >>>>>>>>>>> other we may need. I find that this would remove the "ugliness" >>>>>>>> Matthias >>>>>>>>>>> pointed out for 3. and 4. >>>>>>>>>>> >>>>>>>>>>> LM2. No matter how we end up expressing the different semantics >>> for >>>>>>>>> flush, >>>>>>>>>>> let's make sure we update the KIP on the flush and >>> commitTransaction >>>>>>>>> java >>>>>>>>>>> docs. It currently states that flush "clears the last exception" >>>>>>> and >>>>>>>>>>> commitTransaction "will NOT throw" if called after flush, but it >>>>>>>> really >>>>>>>>> all >>>>>>>>>>> depends on the config/options/method used. >>>>>>>>>>> >>>>>>>>>>> LM3. I find it would be helpful to include an example to show the >>>>>>> new >>>>>>>>> flow >>>>>>>>>>> that we're unblocking (I see this as the great gain here): flush >>>>>>> with >>>>>>>>> clear >>>>>>>>>>> error option enabled -> catch and do whatever error handling we >>> want >>>>>>>> -> >>>>>>>>>>> commitTransaction successfully >>>>>>>>>>> >>>>>>>>>>> Thanks! >>>>>>>>>>> >>>>>>>>>>> Lianet >>>>>>>>>>> >>>>>>>>>>> On Wed, Jun 19, 2024 at 11:26 PM Chris Egerton < >>>>>>>> fearthecel...@gmail.com >>>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>> >>>>>>>>>>>> I like the alternatives you've listed. One more that might help >>> is >>>>>>>> if, >>>>>>>>>>>> instead of overloading flush(), we overloaded commitTransaction() >>>>>>> to >>>>>>>>>>>> something like commitTransaction(boolean tolerateRecordErrors). >>>>>>> This >>>>>>>>> seems >>>>>>>>>>>> slightly cleaner in that it takes the behavioral change we want, >>>>>>>> which >>>>>>>>> only >>>>>>>>>>>> applies to transactional producers, to an API method that is only >>>>>>>> used >>>>>>>>> for >>>>>>>>>>>> transactional producers. It would also avoid the issue of whether >>>>>>> or >>>>>>>>> not >>>>>>>>>>>> flush() (or a new variant of it with altered semantics) should >>>>>>> throw >>>>>>>> or >>>>>>>>>>>> not. Thoughts? >>>>>>>>>>>> >>>>>>>>>>>> Hi Alieh, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for the KIP, I like this direction a lot more than the >>>>>>>> pluggable >>>>>>>>>>>> handler! >>>>>>>>>>>> >>>>>>>>>>>> I share Artem's concerns that enabling this behavior via >>>>>>>> configuration >>>>>>>>>>>> doesn't seem like a great fit. It's likely that application code >>>>>>> will >>>>>>>>> be >>>>>>>>>>>> written in a style that only works with one type of behavior from >>>>>>>>>>>> transactional producers, so requiring that application code to >>>>>>>> declare >>>>>>>>> its >>>>>>>>>>>> expectations for the behavior of its producer seems more >>>>>>> appropriate >>>>>>>>> than, >>>>>>>>>>>> e.g., allowing users deploying that application to tweak a >>>>>>>>> configuration >>>>>>>>>>>> file that gets fed to producers spun up inside it. >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> >>>>>>>>>>>> Chris >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Jun 19, 2024 at 10:32 PM Matthias J. Sax < >>> mj...@apache.org >>>>>>>> >>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the KIP Alieh. I actually like the KIP as-is, but >>> think >>>>>>>>>>>>> Arthem raises very good points... >>>>>>>>>>>>> >>>>>>>>>>>>> Seems we have four options on how to move forward? >>>>>>>>>>>>> >>>>>>>>>>>>> 1. add config to allow "silent error clearance" as the KIP >>>>>>>> proposes >>>>>>>>>>>>> 2. change flush() to clear error and let it throw >>>>>>>>>>>>> 3. add new flushAndThrow()` (or better name) which clears >>> error >>>>>>>> and >>>>>>>>>>>>> throws >>>>>>>>>>>>> 4. add `flush(boolean clearAndThrow)` and let user pick (and >>>>>>>>> deprecate >>>>>>>>>>>>> existing `flush()`) >>>>>>>>>>>>> >>>>>>>>>>>>> For (2), given that it would be a behavior change, we might also >>>>>>>> need >>>>>>>>> a >>>>>>>>>>>>> public "feature flag" config. >>>>>>>>>>>>> >>>>>>>>>>>>> It seems, both (1) and (2) have the issue Artem mentioned. (3) >>> and >>>>>>>> (4) >>>>>>>>>>>>> would be safer to this end, however, for both we kinda get an >>> ugly >>>>>>>>> API? >>>>>>>>>>>>> >>>>>>>>>>>>> Not sure right now if I have any preference. Seems we need to >>> pick >>>>>>>>> some >>>>>>>>>>>>> evil and that there is no clear best solution? Would be good to >>>>>>> her >>>>>>>>> from >>>>>>>>>>>>> others what they think >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -Matthias >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On 6/18/24 8:39 PM, Artem Livshits wrote: >>>>>>>>>>>>>> Hi Alieh, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thank you for the KIP. I have a couple of suggestions: >>>>>>>>>>>>>> >>>>>>>>>>>>>> AL1. We should throw an error from flush after we clear it. >>>>>>> This >>>>>>>>>>>> would >>>>>>>>>>>>>> make it so that both "send + commit" and "send + flush + >>> commit" >>>>>>>> (the >>>>>>>>>>>>>> latter looks like just a more verbose way to express the >>> former, >>>>>>>> and >>>>>>>>> it >>>>>>>>>>>>>> would be intuitive if it behaves the same) would throw if the >>>>>>>>>>>> transaction >>>>>>>>>>>>>> has an error (so if the code is written either way it's going >>> be >>>>>>>>>>>>> correct). >>>>>>>>>>>>>> At the same time, the latter could be extended by the caller to >>>>>>>>>>>> intercept >>>>>>>>>>>>>> exceptions from flush, ignore as needed, and commit the >>>>>>>> transaction. >>>>>>>>>>>>> This >>>>>>>>>>>>>> solution would keep basic things simple (if someone has code >>> that >>>>>>>>>>>> doesn't >>>>>>>>>>>>>> require advanced error handling, then basic "send + flush + >>>>>>> commit" >>>>>>>>>>>> would >>>>>>>>>>>>>> do the right thing) and advanced things possible, an >>> application >>>>>>>> can >>>>>>>>>>>> add >>>>>>>>>>>>>> try + catch around flush and ignore some errors. >>>>>>>>>>>>>> >>>>>>>>>>>>>> AL2. I'm not sure if config is the best way to express the >>>>>>>>>>>> modification >>>>>>>>>>>>> of >>>>>>>>>>>>>> the "flush" semantics -- the application logic that calls >>> "flush" >>>>>>>>> needs >>>>>>>>>>>>> to >>>>>>>>>>>>>> match the "flush" semantics and configuring semantics in a >>>>>>> detached >>>>>>>>>>>> place >>>>>>>>>>>>>> creates a room for bugs due to discrepancies. This can be >>>>>>>> especially >>>>>>>>>>>> bad >>>>>>>>>>>>>> if the producer loads configuration from a file at run time, in >>>>>>>> that >>>>>>>>>>>>> case a >>>>>>>>>>>>>> mistake in configuration could break the application because it >>>>>>> was >>>>>>>>>>>>> written >>>>>>>>>>>>>> to expect one "flush" semantics but the semantics is switched. >>>>>>>> Given >>>>>>>>>>>>> that >>>>>>>>>>>>>> the "flush" semantics needs to match the caller's expectation, >>> a >>>>>>>> way >>>>>>>>> to >>>>>>>>>>>>>> accomplish that would be to pass the caller's expectation to >>> the >>>>>>>>>>>> "flush" >>>>>>>>>>>>>> call by either have a method with a different name or have an >>>>>>>>> overload >>>>>>>>>>>>> with >>>>>>>>>>>>>> a Boolen flag that would configure the semantics (the current >>>>>>>> method >>>>>>>>>>>>> could >>>>>>>>>>>>>> just redirect to the new one). >>>>>>>>>>>>>> >>>>>>>>>>>>>> -Artem >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Jun 17, 2024 at 9:09 AM Alieh Saeedi >>>>>>>>>>>>> <asae...@confluent.io.invalid> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I'd like to kick off a discussion for KIP-1059 that suggests >>>>>>>> adding >>>>>>>>> a >>>>>>>>>>>>> new >>>>>>>>>>>>>>> feature to the Producer flush() method. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1059%3A+Enable+the+Producer+flush%28%29+method+to+clear+the+latest+send%28%29+error >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>> Alieh >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>>>> >>>> >>>