Hi all,
thanks for the brilliant ideas.

The KIP is updated as follows:

-Regarding the word "latest”: I chose this word because in documentation of
the `commitTnx()`, it is clearly mentioned that the method throws the lates
exception of `send()`. BTW, I agree with you to change it to “any” since
“latest” can be confusing. The user may think, what about the pre-last
exception?!


-KIP name: updated to “Enable Producer to resolve send() method errors”.
Agree?


-Map is replaced by enum `CommitOption` as Andrew suggested. Thanks for all
the suggestions. I knew Map is not the best choice but hesitated to define
something new.


-The method javadocs: updated

Looking forward to your votes.

Bests,
Alieh

On Wed, Jun 26, 2024 at 10:52 AM Andrew Schofield <andrew_schofi...@live.com>
wrote:

> Hi,
> Looking at your suggestions for the CommitOptions, I would be happy with
> either.
> I definitely prefer to the Map<String, ?> in the KIP.
>
> We also need to think about the other option where CLEAR_SEND_ERRORS
> hasn’t been
> specified, and leave ourselves space for other options in the future.
>
> If we use an enum, we need to give the existing behaviour a name. Maybe:
>
>    public enum CommitOptions {
>       /**
>        * Commits the ongoing transaction, flushing any unsent records
> before
>        * actually committing the transaction. If any of the records sent
> in this transaction
>        * hit unrecoverable errors, the transaction will not be committed.
>        */
>      NONE,
>
>       /**
>        *  Commits the ongoing transaction, first clearing any errors from
> records already sent
>        *  in this transaction and then flushing any unsent records before
> committing the transaction.
>        *  If there are any unsent records flushed by this operation which
> hit unrecoverable errors,
>        *  these errors will not be cleared and the transaction will not be
> committed.
>        *  <p>
>        *  To ensure there are no unsent records, you must call {@link
> #flush()} before
>        *  committing the transaction.
>        */
>       CLEAR_SEND_ERRORS;
>    }
>
> The Javadoc for commitTransaction will have to be careful crafted.
>
> Thanks,
> Andrew
>
>
> > On 26 Jun 2024, at 03:23, Matthias J. Sax <mj...@apache.org> wrote:
> >
> >>> I was also curious about this text:
> >>>> The new method clears the latest error produced by
> `send(ProducerRecord)`
> >>> and transits the transaction back from the error state
> >
> > I agree. We should not say "latest" but "any".
> >
> >
> >
> >> Is it fair to say that we expect to encounter only one send response
> with
> >> an error that we clear on commitTransaction? Is that solving the
> problem?
> >
> > Yes, I think that is an accurate description.
> >
> >
> >> Speaking of documentation, is it confusing that the name of the KIP is
> no
> >> longer consistent with the approach the KIP takes?
> >
> > Sounds like an easy fix (we should not update the subject of the discuss
> email thread thought...)
> >
> >
> >
> >> looks like we said on the KIP that the options will be of the type
> Map<String,
> >> ?> commitOptions. Would we want to define something more specific?
> >
> > I would also suggest to either pass an enum (as preferred by Andrew) or
> a newly define class `CommitOptions`.
> >
> > Something like:
> >
> >    public enum CommitOptions {
> >        /**
> >         * Cleans any previous send errors, before committing the
> transaction.
> >         * Note, if there are pending sends, error won't be cleared.
> >         * To avoid pending sends, you need to call {@link #flush()}
> before committing the transaction.
> >         */
> >        CLEAR_SEND_ERRORS
> >    }
> >
> >
> > Or similar for a class:
> >
> >    public class CommitOptions {
> >        /**
> >         * Cleans any previous send errors, before committing the
> transaction.
> >         * Note, if there are pending sends, error won't be cleared.
> >         * To avoid pending sends, you need to call {@link #flush()}
> before committing the transaction.
> >         */
> >        public static CommitOptions clearSendErrors();
> >    }
> >
> > Plus:
> >
> >    public class KafkaProducer {
> >
> >        /**
> >         * <same JavaDocs as for commitTransaction()>
> >         *
> >         * This method should only be called if there are no pending
> writes, ie, only after calling {@link #flush()).
> >         * If there are any errors sending messaged to topics, these
> errors can be cleared by passing {@link CommitOptions#clearSendErrors},
> allowing the transaction to commit even in case of data loss during.
> >         *
> >         * If this method is used while there are pending sends,
> >         * send error cannot be cleared.
> >         */
> >        public void commitTransaction(CommitOptions options);
> >    }
> >
> > Or something like this -- we can discuss details about JavaDocs on the
> PR IMHO.
> >
> >
> >> What I
> >> really don't like (in any of the options), is that we cannot really
> >> document it in a way that articulates a value in the product.  There are
> >> tons of nuances that require understanding some buggy behavior, then
> fixed
> >> behavior, then an option to sometimes turn on buggy behavior, and etc.
> >
> > I would believe that users don't need to understand the full history of
> events, and the propose JavaDocs should go a long way.
> >
> >
> >
> > -Matthias
> >
> >
> > On 6/25/24 5:00 PM, Justine Olshan wrote:
> >> Hey there,
> >> I had a few questions about the update.
> >> Looks like we said on the KIP that the options will be of the type
> Map<String,
> >> ?> commitOptions. Would we want to define something more specific?
> >> I was also curious about this text:
> >>> The new method clears the latest error produced by
> `send(ProducerRecord)`
> >> and transits the transaction back from the error state
> >> Is it fair to say that we expect to encounter only one send response
> with
> >> an error that we clear on commitTransaction? Is that solving the
> problem?
> >> I think we also need to be really careful about the documentation of
> this
> >> method. It should be clear that setting this option will not do
> anything if
> >> there is any inflight record when the method is called.
> >> Speaking of documentation, is it confusing that the name of the KIP is
> no
> >> longer consistent with the approach the KIP takes?
> >> Thanks,
> >> Justine
> >> On Tue, Jun 25, 2024 at 5:08 AM Alieh Saeedi
> <asae...@confluent.io.invalid>
> >> wrote:
> >>> Hi all,
> >>>
> >>> Appreciation for maintaining the momentum of our discussion.
> >>>
> >>>
> >>> I see kinda consensus over the main points. It seems that we agreed on
> the
> >>> following:
> >>>
> >>> 1) Define the `commitTnx(commitOptions)` to clear the error.
> >>>
> >>> 2) Make the user explicitly call  `flush()` before
> >>> `commitTnx(commitOptions)`, if he determines ignoring errors.
> >>>
> >>>
> >>> I updated the KIP with the above-mentioned points. Please take a look.
> I am
> >>> sure it is not perfect yet, and there are/will be some open questions,
> but
> >>> if you agree, I will open voting as well. Of course, the discussion can
> >>> still carry on in this thread.
> >>>
> >>>
> >>> Cheers,
> >>>
> >>> Alieh
> >>>
> >>> On Tue, Jun 25, 2024 at 11:36 AM Chris Egerton <
> fearthecel...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi Artem,
> >>>>
> >>>> Yes, I completely agree that by default, special action shouldn't be
> >>>> required from users to prevent transactions from being committed when
> one
> >>>> or more records can't be sent. The behavior I was suggesting was only
> >>>> relevant to the new API we're discussing where we allow users to
> >>>> intentionally bypass that logic when invoking commitTransaction.
> >>>>
> >>>> Cheers,
> >>>>
> >>>> Chris
> >>>>
> >>>> On Tue, Jun 25, 2024, 01:44 Artem Livshits <alivsh...@confluent.io
> >>>> .invalid>
> >>>> wrote:
> >>>>
> >>>>> Hey folks,
> >>>>>
> >>>>> Great discussion!
> >>>>>
> >>>>> Re: throwing exceptions from send().  send() is documented to throw
> >>>>> KafkaException, so if the application doesn't handle it, it should
> be a
> >>>>> bug.  Now, it does have a note that API exceptions wouldn't be
> thrown,
> >>>> not
> >>>>> sure if we have code that relies on that.  There is a reason
> exceptions
> >>>>> have classes, they are designed to express a "class of errors" that
> can
> >>>> be
> >>>>> handled, so that we don't have to add a flag or a new method every
> time
> >>>> we
> >>>>> have a new exception to throw.  But if there is consensus that it's
> >>> still
> >>>>> too risky (especially if we have examples of code that gets broken),
> >>>> then I
> >>>>> agree that we shouldn't do it.
> >>>>>
> >>>>> Re: various ways to communicate semantics change.  If we must have 2
> >>>>> different behaviors, I think passing options to "ignore errors" to
> >>>>> commitTransaction is probably the most intuitive way to do it.  What
> I
> >>>>> really don't like (in any of the options), is that we cannot really
> >>>>> document it in a way that articulates a value in the product.  There
> >>> are
> >>>>> tons of nuances that require understanding some buggy behavior, then
> >>>> fixed
> >>>>> behavior, then an option to sometimes turn on buggy behavior, and
> etc.
> >>>>>
> >>>>>>  if a user invokes Producer::abortTransaction from within a producer
> >>>>> callback today
> >>>>>
> >>>>> I think we would get invalid state exception.  Which we could
> probably
> >>>> fix,
> >>>>> but even if we supported it, I think it would be good if doing send +
> >>>>> commit would lead to aborted transaction without special action from
> >>> the
> >>>>> application -- the simple things should be really simple, any failure
> >>>>> during send or commit should abort send + commit sequence without
> >>> special
> >>>>> handling.
> >>>>>
> >>>>> -Artem
> >>>>>
> >>>>> On Mon, Jun 24, 2024 at 6:37 PM Chris Egerton <
> fearthecel...@gmail.com
> >>>>
> >>>>> wrote:
> >>>>>
> >>>>>> One quick thought: if a user invokes Producer::abortTransaction from
> >>>>> within
> >>>>>> a producer callback today, even in the midst of an ongoing call to
> >>>>>> Producer::commitTransaction, what is the behavior? Would it be
> >>>> reasonable
> >>>>>> to support this behavior as a way to allow error handling to take
> >>> place
> >>>>>> during implicit flushes, via producer callback?
> >>>>>>
> >>>>>> On Mon, Jun 24, 2024 at 9:21 PM Matthias J. Sax <mj...@apache.org>
> >>>>> wrote:
> >>>>>>
> >>>>>>> My point it, that it does not seem to be safe to allow users to
> >>>> ignore
> >>>>>>> errors with an implicit flush, and I think it's better to only
> >>> allow
> >>>> it
> >>>>>>> with (ie, after) an explicit flush().
> >>>>>>>
> >>>>>>> My reasoning is, that users should make a decision to ignore errors
> >>>> or
> >>>>>>> not, before calling `commitTx()`, but after inspecting all
> >>> potential
> >>>>>>> send errors. With an implicit flush, users need to "blindly" decide
> >>>> to
> >>>>>>> ignore send errors, because there are pending sends and potential
> >>>>> errors
> >>>>>>> are not known yet, when calling `commitTx()`.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>> In the documentation of commitTransaction, we say if any send
> >>>> throws
> >>>>> an
> >>>>>>>> error, commitTransaction will too.
> >>>>>>>
> >>>>>>> Yes. And I think we should keep it this way for an implicit flush.
> >>>> With
> >>>>>>> an explicit flush, `commitTransaction()` cannot encounter any send
> >>>>>>> errors any longer.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>> It says that all callbacks will be executed, but we ignore the
> >>>> errors
> >>>>>> of
> >>>>>>>> the callbacks.
> >>>>>>>
> >>>>>>> Ah. Thanks for pointing this out. For this case it's even worse
> >>> (for
> >>>>>>> case (2)), because the user cannot inspect any errors and make any
> >>>>>>> decision to ignore or not during an implicit flush...
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>> We shouldn't be relying on errors in the callback unless we are
> >>>>>>>> calling flush, which we can still do. It seems this has always
> >>> been
> >>>>> the
> >>>>>>>> case as well.
> >>>>>>>
> >>>>>>> Yes, has always been this way, and my point is to keep it this way
> >>>>>>> (option (2) would change it), and not start to allow to ignore
> >>> errors
> >>>>>>> with an implicit flush.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On 6/24/24 4:57 PM, Justine Olshan wrote:
> >>>>>>>> Transaction verification is a concept from KIP-890 referring to
> >>> the
> >>>>>>>> verification that a partition has been added to the transaction.
> >>>> It's
> >>>>>>> not a
> >>>>>>>> huge deal, but maybe we don't want to overload the terminology.
> >>>>>>>>
> >>>>>>>> For option 2, I was a little confused by this
> >>>>>>>>
> >>>>>>>>>    when commitTx is called, there is still pending Futures and
> >>> not
> >>>>>>>> all Callbacks are executed yet -- with the implicit flush, we
> >>> know
> >>>>> that
> >>>>>>>> all Callbacks are executed, but even for this case, the user
> >>> could
> >>>>> only
> >>>>>>>> throw an exception inside the Callback to stop the TX to
> >>> eventually
> >>>>>>>> commit -- Futures cannot be used to make a decision to ignore
> >>> error
> >>>>> and
> >>>>>>>> commit or not.
> >>>>>>>>
> >>>>>>>> In the documentation of commitTransaction, we say if any send
> >>>> throws
> >>>>> an
> >>>>>>>> error, commitTransaction will too.
> >>>>>>>>
> >>>>>>>> *Further, if any of the {@link #send(ProducerRecord)} calls which
> >>>>> were
> >>>>>>> part
> >>>>>>>> of the transaction hit irrecoverable errors, this method will
> >>> throw
> >>>>> the
> >>>>>>>> last received exception immediately and the transaction will not
> >>> be
> >>>>>>>> committed.*
> >>>>>>>>
> >>>>>>>> It says that all callbacks will be executed, but we ignore the
> >>>> errors
> >>>>>> of
> >>>>>>>> the callbacks.
> >>>>>>>>
> >>>>>>>> *If the transaction is committed successfully and this method
> >>>> returns
> >>>>>>>> without throwing an exception, it is guaranteed that all {@link
> >>>>>> Callback
> >>>>>>>> callbacks} for records in the transaction will have been invoked
> >>>> and
> >>>>>>>> completed. Note that exceptions thrown by callbacks are ignored;
> >>>> the
> >>>>>>>> producer proceeds to commit the transaction in any case.*
> >>>>>>>>
> >>>>>>>> Is it fair to say though that for the send errors, we can choose
> >>> to
> >>>>>>> ignore
> >>>>>>>> them? II wasn't understanding where the callbacks come in with
> >>> your
> >>>>>>>> comment. We shouldn't be relying on errors in the callback unless
> >>>> we
> >>>>>> are
> >>>>>>>> calling flush, which we can still do. It seems this has always
> >>> been
> >>>>> the
> >>>>>>>> case as well.
> >>>>>>>>
> >>>>>>>> Justine
> >>>>>>>>
> >>>>>>>> On Mon, Jun 24, 2024 at 11:07 AM Andrew Schofield <
> >>>>>>> andrew_schofi...@live.com>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Agreed. Options 1 and 3 are safe. Option 2 is not. I’d be happy
> >>>> with
> >>>>>> 3a
> >>>>>>> as
> >>>>>>>>> the way.
> >>>>>>>>>
> >>>>>>>>> I suggest “TRANSACTION VERIFIED”.
> >>>>>>>>>
> >>>>>>>>> There isn’t really precedent for options in the producer API. We
> >>>>> could
> >>>>>>> use
> >>>>>>>>> an enum,
> >>>>>>>>> which is easy to use and not very future-proof. Or we could use
> >>> a
> >>>>>> class
> >>>>>>>>> like the
> >>>>>>>>> admin API does, which is cumbersome and flexible.
> >>>>>>>>>
> >>>>>>>>>    CommitTransactionOptions.TRANSACTION_VERIFIED
> >>>>>>>>>
> >>>>>>>>> or
> >>>>>>>>>
> >>>>>>>>>    public class CommitTransactionOptions {
> >>>>>>>>>      public CommitTransactionOptions();
> >>>>>>>>>
> >>>>>>>>>      CommitTransactionOptions transactionVerified(boolean
> >>>>>>>>> transactionVerified);
> >>>>>>>>>
> >>>>>>>>>      boolean transactionVerified();
> >>>>>>>>>    }
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Then 3b is:
> >>>>>>>>>
> >>>>>>>>>     send(…)
> >>>>>>>>>     send(…)
> >>>>>>>>>     flush()
> >>>>>>>>>     commitTransaction(new
> >>>>>>>>> CommitTransactionOptions().transactionVerified(true))
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> I’d tend towards the enum here because I doubt we need as much
> >>>>>>> flexibility
> >>>>>>>>> as the admin API requires.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Andrew
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>> On 24 Jun 2024, at 18:39, Matthias J. Sax <mj...@apache.org>
> >>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> I am ok either way (ie, flush or commit), but I think we need
> >>> to
> >>>>>> define
> >>>>>>>>> exact semantics, and I think there is some subtle thing to
> >>>> consider:
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 1) flush(Options)
> >>>>>>>>>>
> >>>>>>>>>> Example:
> >>>>>>>>>>
> >>>>>>>>>>   send(...)
> >>>>>>>>>>   send(...)
> >>>>>>>>>>
> >>>>>>>>>>   flush(ignoreErrors)
> >>>>>>>>>>
> >>>>>>>>>>   // at this point, we know that all Futures are completed and
> >>>> all
> >>>>>>>>> Callbacks are executed, and we can assume that all user code
> >>>>> checking
> >>>>>>> for
> >>>>>>>>> errors did execute, before `commitTx` is called
> >>>>>>>>>>
> >>>>>>>>>>   // I consider this option as safe
> >>>>>>>>>>
> >>>>>>>>>>   commitTx()
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 2) commitTx(Option)
> >>>>>>>>>>
> >>>>>>>>>> Example:
> >>>>>>>>>>
> >>>>>>>>>>   send(...)
> >>>>>>>>>>   send(...)
> >>>>>>>>>>
> >>>>>>>>>>   // when commitTx is called, there is still pending Futures
> >>> and
> >>>>> not
> >>>>>>> all
> >>>>>>>>> Callbacks are executed yet -- with the implicit flush, we know
> >>>> that
> >>>>>> all
> >>>>>>>>> Callbacks are executed, but even for this case, the user could
> >>>> only
> >>>>>>> throw
> >>>>>>>>> an exception inside the Callback to stop the TX to eventually
> >>>> commit
> >>>>>> --
> >>>>>>>>> Futures cannot be used to make a decision to ignore error and
> >>>> commit
> >>>>>> or
> >>>>>>> not.
> >>>>>>>>>>
> >>>>>>>>>>   // I consider this option not as safe
> >>>>>>>>>>
> >>>>>>>>>>   commitTx(igrnoreErrors)
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 3a) required flush + commitTx(Option)
> >>>>>>>>>>
> >>>>>>>>>> Example:
> >>>>>>>>>>
> >>>>>>>>>>   send(...)
> >>>>>>>>>>   send(...)
> >>>>>>>>>>
> >>>>>>>>>>   flush()
> >>>>>>>>>>
> >>>>>>>>>>   // at this point, we know that all Future are completed and
> >>> all
> >>>>>>>>> Callbacks are executed, and we can assume that all user code
> >>>>> checking
> >>>>>>> for
> >>>>>>>>> error did execute, before `commitTx` is called
> >>>>>>>>>>
> >>>>>>>>>>   // I consider this option as safe
> >>>>>>>>>>
> >>>>>>>>>>   commitTx(ignoreErrors)
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 3b) missing flush + commitTx(Option)
> >>>>>>>>>>
> >>>>>>>>>> Example:
> >>>>>>>>>>
> >>>>>>>>>>   send(...)
> >>>>>>>>>>   send(...)
> >>>>>>>>>>
> >>>>>>>>>>   // as flush() was not called explicitly, we should ignore
> >>>>>>>>> `ignoreErrors` flag and always throw an exception if the
> >>> producer
> >>>> is
> >>>>>> in
> >>>>>>>>> error state, because we cannot be sure that the user did all
> >>>>> required
> >>>>>>> check
> >>>>>>>>> for error handling
> >>>>>>>>>>
> >>>>>>>>>>   commitTx(ignoreErrors)
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> The only issue with option (3) is, that it's more complex and
> >>>>>> semantics
> >>>>>>>>> are more subtle. But it might be the a good (necessary?) bridge
> >>>>>> between
> >>>>>>> (1)
> >>>>>>>>> and (2): (3) is semantically sound (we ignore errors via
> >>> passing a
> >>>>>> flag
> >>>>>>>>> into commitTx() instead of flush()), and at the same time safe
> >>> (we
> >>>>>> force
> >>>>>>>>> users to explicitly flush() and [hopefully] do proper error
> >>>>> handling,
> >>>>>>> and
> >>>>>>>>> don't rely in am implicit flush() during commitTx() which might
> >>> be
> >>>>>> error
> >>>>>>>>> prone).
> >>>>>>>>>>
> >>>>>>>>>> (Also need to find a good and descriptive name for the flag we
> >>>> pass
> >>>>>>> into
> >>>>>>>>> `commitTx()` for this case.)
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 6/24/24 8:51 AM, Andrew Schofield wrote:
> >>>>>>>>>>> Hi Chris,
> >>>>>>>>>>> That works for me too. I slightly prefer an option on flush(),
> >>>> but
> >>>>>>> what
> >>>>>>>>> you suggested
> >>>>>>>>>>> works too.
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>> Andrew
> >>>>>>>>>>>> On 24 Jun 2024, at 15:14, Chris Egerton
> >>>> <chr...@aiven.io.INVALID
> >>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Hi Andrew,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I like a lot of what you said, but I still believe it's
> >>> better
> >>>> to
> >>>>>>>>> override
> >>>>>>>>>>>> commitTransaction than flush. Users will already have to
> >>>> manually
> >>>>>> opt
> >>>>>>>>> in to
> >>>>>>>>>>>> ignoring errors encountered during transactions, and we can
> >>>>>> document
> >>>>>>>>>>>> recommended usage (i.e., explicitly invoking flush() before
> >>>>>> invoking
> >>>>>>>>>>>> commitTransaction(ignoreRecordErrors)) in the
> >>> newly-introduced
> >>>>>>> method.
> >>>>>>>>> I
> >>>>>>>>>>>> don't believe it's worth the increased cognitive load on
> >>> users
> >>>>> with
> >>>>>>>>>>>> non-transactional producers to introduce an overloaded
> >>> flush()
> >>>>>>> variant.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Chris
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Mon, Jun 24, 2024 at 9:39 AM Andrew Schofield <
> >>>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Alieh,
> >>>>>>>>>>>>> Thanks for driving this. Unfortunately, there are many parts
> >>>> of
> >>>>>> the
> >>>>>>>>> API
> >>>>>>>>>>>>> which
> >>>>>>>>>>>>> are a bit unfortunate and it’s tricky to make small
> >>>> improvements
> >>>>>>> that
> >>>>>>>>>>>>> don’t have
> >>>>>>>>>>>>> downsides.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I don’t like the idea of using a configuration because
> >>>>>> configuration
> >>>>>>>>> is
> >>>>>>>>>>>>> often
> >>>>>>>>>>>>> outside the application and changing the behaviour of
> >>> someone
> >>>>>> else’s
> >>>>>>>>>>>>> application
> >>>>>>>>>>>>> without understanding it is risky. Anything which embeds a
> >>>>>>>>> transactional
> >>>>>>>>>>>>> producer
> >>>>>>>>>>>>> could have its behaviour changed unexpectedly.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> It would be been much nicer if send() didn’t fail silently
> >>> and
> >>>>>>> change
> >>>>>>>>> the
> >>>>>>>>>>>>> transaction
> >>>>>>>>>>>>> state. But, because it’s an asynchronous operation, I don’t
> >>>>> really
> >>>>>>>>> think
> >>>>>>>>>>>>> we can
> >>>>>>>>>>>>> just make it throw all exceptions, even though I really
> >>> think
> >>>>> that
> >>>>>>>>>>>>> `send()` is the
> >>>>>>>>>>>>> method with the problem here.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The contract of `flush()` is that it makes sure that all
> >>>>> preceding
> >>>>>>>>> sends
> >>>>>>>>>>>>> will have
> >>>>>>>>>>>>> completed, so it should be true that a well written
> >>>> application
> >>>>>>> would
> >>>>>>>>> be
> >>>>>>>>>>>>> able to
> >>>>>>>>>>>>> know which records were OK because of the
> >>>> Future<RecordMetadata>
> >>>>>>>>> returned
> >>>>>>>>>>>>> by the `send()` method. It should be able to determine
> >>> whether
> >>>>> it
> >>>>>>>>> wants to
> >>>>>>>>>>>>> commit
> >>>>>>>>>>>>> the transaction even if some of the intended operations
> >>> didn’t
> >>>>>>>>> succeed.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> What we don’t currently have is a way for the application to
> >>>> say
> >>>>>> to
> >>>>>>>>> the
> >>>>>>>>>>>>> KafkaProducer
> >>>>>>>>>>>>> that it knows the outcome of sending the records and to
> >>>> confirm
> >>>>>> that
> >>>>>>>>> it
> >>>>>>>>>>>>> wants to proceed.
> >>>>>>>>>>>>> Then it would not be necessary for `commitTransaction()` to
> >>>>> throw
> >>>>>> an
> >>>>>>>>>>>>> exception to
> >>>>>>>>>>>>> report a historical error which the application might choose
> >>>> to
> >>>>>>>>> ignore.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Having read the comments, I think the KIP is on the right
> >>>> lines
> >>>>>>>>> focusing
> >>>>>>>>>>>>> on the `flush()`
> >>>>>>>>>>>>> method. My suggestion is that we introduce an option on
> >>>>> `flush()`
> >>>>>> to
> >>>>>>>>> be
> >>>>>>>>>>>>> used before
> >>>>>>>>>>>>> `commitTransaction()` for applications that want to be able
> >>> to
> >>>>>>> commit
> >>>>>>>>>>>>> transactions which
> >>>>>>>>>>>>> had known failed operations.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The code would be:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>    producer.beginTransaction();
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>    future1 = producer.send(goodRecord1);
> >>>>>>>>>>>>>    future2 = producer.send(badRecord); // The future from
> >>> this
> >>>>>> call
> >>>>>>>>> will
> >>>>>>>>>>>>> complete exceptionally
> >>>>>>>>>>>>>    future3 = producer.send(goodRecord2);
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>    producer.flush(FlushOption.TRANSACTION_READY);
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>    // At this point, we know that all 3 futures are complete
> >>>> and
> >>>>>> the
> >>>>>>>>>>>>> transaction contains 2 records
> >>>>>>>>>>>>>    producer.commitTransaction();
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I wouldn’t deprecate `flush()` with no option. It just uses
> >>>> the
> >>>>>>>>> default
> >>>>>>>>>>>>> option which behaves
> >>>>>>>>>>>>> like today.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Why did I suggest an option on `flush()` rather than
> >>>>>>>>>>>>> `commitTransaction()`? Because with
> >>>>>>>>>>>>> `flush()`, it’s clear when the application is stating that
> >>>> it’s
> >>>>>> seen
> >>>>>>>>> all
> >>>>>>>>>>>>> of the results from its
> >>>>>>>>>>>>> `send()` calls and it’s ready to proceed. If it has to rely
> >>> on
> >>>>>>>>> flushing
> >>>>>>>>>>>>> that occurs inside
> >>>>>>>>>>>>> `commitTransaction()`, I don’t see it’s as clear-cut.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>> Andrew
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 24 Jun 2024, at 13:44, Alieh Saeedi
> >>>>>>> <asae...@confluent.io.INVALID
> >>>>>>>>>>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>> Thanks for the interesting discussion.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I assume that now the main questions are as follows:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1. Do we need to transit the transcation to the error state
> >>>> for
> >>>>>> API
> >>>>>>>>>>>>>> exceptions?
> >>>>>>>>>>>>>> 2. Should we throw the API exception in `send()` instead of
> >>>>>>>>> returning a
> >>>>>>>>>>>>>> future error?
> >>>>>>>>>>>>>> 3. If the answer to question (1) is NO and to question (2)
> >>> is
> >>>>>> YES,
> >>>>>>>>> do we
> >>>>>>>>>>>>>> need to change the current `flush` or `commitTnx` at all?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>> Alieh
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Sat, Jun 22, 2024 at 3:21 AM Matthias J. Sax <
> >>>>>> mj...@apache.org>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hey Kirk,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> can you elaborate on a few points?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Otherwise users would have to know to explicitly change
> >>>> their
> >>>>>>> code
> >>>>>>>>> to
> >>>>>>>>>>>>>>> invoke flush().
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Why? If we would add an option to `flush(FlushOption)`,
> >>> the
> >>>>>>> existing
> >>>>>>>>>>>>>>> `flush()` w/o any option will still be there, right? If we
> >>>>> would
> >>>>>>>>> really
> >>>>>>>>>>>>>>> deprecate existing `flush()`, it would just mean that we
> >>>> would
> >>>>>>> pass
> >>>>>>>>>>>>>>> "default FlushOption" into an implicit flush (and yes, we
> >>>>> would
> >>>>>>>>> need to
> >>>>>>>>>>>>>>> define what this would be).
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I think there is no clear winner (as pointed out in my
> >>> last
> >>>>>>> reply),
> >>>>>>>>> and
> >>>>>>>>>>>>>>> both `flush(FlushOption)` and `commitTx(CommitOption)` has
> >>>>>>>>> advantages
> >>>>>>>>>>>>>>> and drawbacks. Guess we need to just agree on which
> >>> tradeoff
> >>>>> we
> >>>>>>>>> want to
> >>>>>>>>>>>>>>> move forward with?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Not sure if your database example is a 1:1 fit? I think,
> >>> the
> >>>>>>> better
> >>>>>>>>>>>>>>> comparison would be:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> BEGIN TX;
> >>>>>>>>>>>>>>> INSERT INTO foo VALUES (’a’);
> >>>>>>>>>>>>>>> INSERT INTO foo VALUES (’b’);
> >>>>>>>>>>>>>>> INSERT INTO foo VALUES (’c’);
> >>>>>>>>>>>>>>> INSERT INTO foo VALUES (’not sure’);
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> For this case, the full TX would roll back, right? I still
> >>>>> think
> >>>>>>>>> that
> >>>>>>>>>>>>>>> allowing users to just skip over the last error, and
> >>>> continue
> >>>>>> the
> >>>>>>> TX
> >>>>>>>>>>>>>>> would be ok. In the end, we provide a programmatic API,
> >>> and
> >>>>> not
> >>>>>> a
> >>>>>>>>>>>>>>> declarative one as SQL. Of course, default behavior would
> >>>>> still
> >>>>>> be
> >>>>>>>>> to
> >>>>>>>>>>>>>>> put the producer into error state, and the user would need
> >>>> to
> >>>>>> call
> >>>>>>>>>>>>>>> `abortTransaction()` to move forward.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 6/21/24 5:26 PM, Kirk True wrote:
> >>>>>>>>>>>>>>>> Hi Matthias,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Jun 21, 2024, at 12:28 PM, Matthias J. Sax <
> >>>>>> mj...@apache.org
> >>>>>>>>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> If we want to limit it to `RecordTooLargeException`
> >>>> throwing
> >>>>>>> from
> >>>>>>>>>>>>>>> `send()` directly make sense. Thanks for calling it out.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> It's still a question of backward compatibility?
> >>> `send()`
> >>>>> does
> >>>>>>>>> throw
> >>>>>>>>>>>>>>> exceptions already, including generic `KafkaException`.
> >>> Not
> >>>>> sure
> >>>>>>> if
> >>>>>>>>> this
> >>>>>>>>>>>>>>> helps with backward compatibility? Could we just add a new
> >>>>>>> exception
> >>>>>>>>>>>>> type
> >>>>>>>>>>>>>>> (which is a child of `KafkaException`)?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> The Producer JavaDocs are not totally explicit about it
> >>>>> IMHO.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I think we could expect that some generic error handling
> >>>>> path
> >>>>>>> gets
> >>>>>>>>>>>>>>> executed. For the TX-case, I would assume that a TX would
> >>> be
> >>>>>>>>> aborted if
> >>>>>>>>>>>>>>> `send()` throws or that the producer would be `closed()`.
> >>>>>> Overall
> >>>>>>>>> this
> >>>>>>>>>>>>>>> might be safe?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> It would be a little less flexible
> >>>>>>>>>>>>>>>>>>> though, since (as you note) it would still be
> >>> impossible
> >>>>> to
> >>>>>>>>> commit
> >>>>>>>>>>>>>>>>>>> transactions after errors have been reported from
> >>>> brokers.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> KS would still need a way to clear the error state of
> >>> the
> >>>>>>>>> producer. We
> >>>>>>>>>>>>>>> could catch a `RecordTooLargeException` from `send()`,
> >>> call
> >>>>> the
> >>>>>>>>> handler
> >>>>>>>>>>>>> and
> >>>>>>>>>>>>>>> let it decide what to do next. But if it does return
> >>>>> `CONTINUE`
> >>>>>> to
> >>>>>>>>>>>>> swallow
> >>>>>>>>>>>>>>> the error and drop the poison pill record on the floor, we
> >>>>> would
> >>>>>>>>> want to
> >>>>>>>>>>>>>>> move forward and commit the transaction.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> But the question is: if we cannot add a record to the
> >>> tx,
> >>>>> does
> >>>>>>> the
> >>>>>>>>>>>>>>> producer need to go into error state? In the end, we did
> >>>> throw
> >>>>>> and
> >>>>>>>>>>>>> inform
> >>>>>>>>>>>>>>> the app that the record was _not_ added, and it's up to
> >>> the
> >>>>> app
> >>>>>> to
> >>>>>>>>>>>>> decide
> >>>>>>>>>>>>>>> what to do next?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> That’s an excellent question…
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Imagine the user’s application is writing information to
> >>> a
> >>>>>>> database
> >>>>>>>>>>>>>>> instead of Kafka. If there’s a table with a CHAR(1) column
> >>>> and
> >>>>>>> this
> >>>>>>>>> SQL
> >>>>>>>>>>>>>>> statement was attempted, what should happen?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>     INSERT INTO foo VALUES (’not sure’);
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Yes, that DML would fail, sure, but would the user expect
> >>>>> that
> >>>>>>> the
> >>>>>>>>>>>>>>> connection used by database library would get stuck in
> >>> some
> >>>>> kind
> >>>>>>> of
> >>>>>>>>>>>>> error
> >>>>>>>>>>>>>>> state? A user would be able catch the error and either
> >>>>> continue
> >>>>>> or
> >>>>>>>>>>>>> abort,
> >>>>>>>>>>>>>>> based on their business rules.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> So I agree with what I believe you’re implying: we
> >>>> shouldn’t
> >>>>>>>>> poison the
> >>>>>>>>>>>>>>> Producer/TransactionManager on certain types of
> >>>>>> application-level
> >>>>>>>>>>>>> errors in
> >>>>>>>>>>>>>>> send().
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Kirk
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> If we report the error only via the `Callback` it's a
> >>>>>> different
> >>>>>>>>> story,
> >>>>>>>>>>>>>>> because the contract for this case is clearly specified on
> >>>> the
> >>>>>>>>> JavaDocs:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> When used as part of a transaction, it is not necessary
> >>>> to
> >>>>>>>>> define a
> >>>>>>>>>>>>>>> callback or check the result of the future
> >>>>>>>>>>>>>>>>>> in order to detect errors from <code>send</code>. If
> >>> any
> >>>> of
> >>>>>> the
> >>>>>>>>> send
> >>>>>>>>>>>>>>> calls failed with an irrecoverable error,
> >>>>>>>>>>>>>>>>>> the final {@link #commitTransaction()} call will fail
> >>> and
> >>>>>> throw
> >>>>>>>>> the
> >>>>>>>>>>>>>>> exception from the last failed send. When
> >>>>>>>>>>>>>>>>>> this happens, your application should call {@link
> >>>>>>>>>>>>> #abortTransaction()}
> >>>>>>>>>>>>>>> to reset the state and continue to send
> >>>>>>>>>>>>>>>>>> data.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On 6/21/24 11:42 AM, Chris Egerton wrote:
> >>>>>>>>>>>>>>>>>> Hi Artem,
> >>>>>>>>>>>>>>>>>> I think it'd make sense to throw directly from send
> >>>>> whenever
> >>>>>>>>>>>>> possible,
> >>>>>>>>>>>>>>>>>> instead of returning an already-completed future. I
> >>>> didn't
> >>>>> do
> >>>>>>>>> that in
> >>>>>>>>>>>>>>> my
> >>>>>>>>>>>>>>>>>> bug fix to try to be conservative about breaking
> >>> changes
> >>>>> but
> >>>>>>> this
> >>>>>>>>>>>>>>> seems to
> >>>>>>>>>>>>>>>>>> have caused its own set of headaches. It would be a
> >>>> little
> >>>>>> less
> >>>>>>>>>>>>>>> flexible
> >>>>>>>>>>>>>>>>>> though, since (as you note) it would still be
> >>> impossible
> >>>> to
> >>>>>>>>> commit
> >>>>>>>>>>>>>>>>>> transactions after errors have been reported from
> >>>> brokers.
> >>>>>>>>>>>>>>>>>> I'll leave it up to the Kafka Streams folks to decide
> >>> if
> >>>>> that
> >>>>>>>>>>>>>>> flexibility
> >>>>>>>>>>>>>>>>>> is required. If it is, then users could explicitly call
> >>>>>> flush()
> >>>>>>>>>>>>> before
> >>>>>>>>>>>>>>>>>> committing (and ignoring errors for) or aborting a
> >>>>>> transaction,
> >>>>>>>>> if
> >>>>>>>>>>>>> they
> >>>>>>>>>>>>>>>>>> want to implement fine-grained error handling logic
> >>> such
> >>>> as
> >>>>>>>>> allowing
> >>>>>>>>>>>>>>> errors
> >>>>>>>>>>>>>>>>>> for a subset of topics to be ignored.
> >>>>>>>>>>>>>>>>>> Hi Matthias,
> >>>>>>>>>>>>>>>>>> Most of the time you're right and we can't throw from
> >>>>> send();
> >>>>>>>>>>>>> however,
> >>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>> this case (client-side record-too-large exception), the
> >>>>> error
> >>>>>>> is
> >>>>>>>>>>>>>>> actually
> >>>>>>>>>>>>>>>>>> noticed by the producer before send() returns, so it
> >>>> should
> >>>>>> be
> >>>>>>>>>>>>>>> possible to
> >>>>>>>>>>>>>>>>>> throw directly.
> >>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>> Chris
> >>>>>>>>>>>>>>>>>> On Fri, Jun 21, 2024, 14:25 Matthias J. Sax <
> >>>>>> mj...@apache.org>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>> Not sure if we can change send and make it throw,
> >>> given
> >>>>> that
> >>>>>>>>> send()
> >>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>> async? That is why users can register a `Callback` to
> >>>>> begin
> >>>>>>>>> with,
> >>>>>>>>>>>>>>> right?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> And Alieh's point about backward compatibility is
> >>> also a
> >>>>>> fair
> >>>>>>>>>>>>> concern.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Actually, this would potentially be even
> >>>>>>>>>>>>>>>>>>>> worse than the original buggy behavior because the
> >>> bug
> >>>>> was
> >>>>>>>>> that we
> >>>>>>>>>>>>>>>>>>> ignored
> >>>>>>>>>>>>>>>>>>>> errors that happened in the "send()" method itself,
> >>> not
> >>>>>>>>> necessarily
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> ones that we got from the broker.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> My understanding was that `commitTx(swallowError)`
> >>> would
> >>>>>> only
> >>>>>>>>>>>>> swallow
> >>>>>>>>>>>>>>>>>>> `send()` errors, not errors about the actually
> >>> commit. I
> >>>>>> agree
> >>>>>>>>> that
> >>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>> would be very bad to swallow errors about the actual
> >>> tx
> >>>>>>>>> commit...
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> It's a fair question if this might be too subtle; to
> >>>> make
> >>>>> it
> >>>>>>>>>>>>> explicit,
> >>>>>>>>>>>>>>>>>>> we could use `CommitOpions#ignorePendingSendErors()`
> >>>>>> [working
> >>>>>>>>> name]
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> make it clear.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> If we think it's too subtle to change commit to
> >>> swallow
> >>>>>> send()
> >>>>>>>>>>>>> errors,
> >>>>>>>>>>>>>>>>>>> maybe going with `flush(FlushOptions)` would be
> >>> clearer
> >>>>> (and
> >>>>>>> we
> >>>>>>>>> can
> >>>>>>>>>>>>>>> use
> >>>>>>>>>>>>>>>>>>> `FlushOption#swallowSendErrorsForTransactions()`
> >>>> [working
> >>>>>>> name]
> >>>>>>>>> to
> >>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>> explicitly that the `FlushOption` for now has only an
> >>>>> effect
> >>>>>>> for
> >>>>>>>>>>>>> TX).
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thoughts?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On 6/21/24 4:10 AM, Alieh Saeedi wrote:
> >>>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> It is very exciting to see all the experts here
> >>> raising
> >>>>>> very
> >>>>>>>>> good
> >>>>>>>>>>>>>>> points.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> As we go further, we see more and more options to
> >>>> improve
> >>>>>> our
> >>>>>>>>>>>>>>> solution,
> >>>>>>>>>>>>>>>>>>>> which makes concluding and updating the KIP
> >>> impossible.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> The main suggestions so far are:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 1. `flush` with `flushOptions` as input parameter
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 2. `commitTx` with `commitOptions` as input parameter
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 3. `send` must throw the exception
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> My concern about the 3rd suggestion:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 1. Does the change cause any issue with backward
> >>>>>>> compatibility?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 2. The `send (bad record)` already transits the
> >>>>> transaction
> >>>>>>> to
> >>>>>>>>> the
> >>>>>>>>>>>>>>> error
> >>>>>>>>>>>>>>>>>>>> state. No user, including Streams is able to transit
> >>>> the
> >>>>>>>>>>>>> transaction
> >>>>>>>>>>>>>>> back
> >>>>>>>>>>>>>>>>>>>> from the error state. Do you mean we remove the
> >>>>>>>>>>>>>>>>>>>> `maybeTransitionToErrorState(e)` from here
> >>>>>>>>>>>>>>>>>>>> <
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> https://github.com/apache/kafka/blob/9b5b434e2a6b2d5290ea403fc02859b1c523d8aa/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1112
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> as well?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>> Alieh
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Fri, Jun 21, 2024 at 8:45 AM Andrew Schofield <
> >>>>>>>>>>>>>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hi Artem,
> >>>>>>>>>>>>>>>>>>>>> I think you make a good point which is worth further
> >>>>>>>>>>>>> consideration.
> >>>>>>>>>>>>>>> If
> >>>>>>>>>>>>>>>>>>>>> any of the existing methods is really ripe for a
> >>>> change
> >>>>>>> here,
> >>>>>>>>> it’s
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> send() that actually caused the problem. If that can
> >>>> be
> >>>>>>> fixed
> >>>>>>>>> so
> >>>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>> no situations in which a lurking error breaks a
> >>>>>> transaction,
> >>>>>>>>> that
> >>>>>>>>>>>>>>> might
> >>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>> the best.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>> Andrew
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On 21 Jun 2024, at 01:51, Artem Livshits <
> >>>>>>>>> alivsh...@confluent.io
> >>>>>>>>>>>>>>>>>>> .INVALID>
> >>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I thought we still wait for requests (and their
> >>>>> errors)
> >>>>>> to
> >>>>>>>>> come
> >>>>>>>>>>>>>>> in and
> >>>>>>>>>>>>>>>>>>>>>> could handle fatal errors appropriately.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> We do wait for requests, but my understanding is
> >>> that
> >>>>>> when
> >>>>>>>>>>>>>>>>>>>>>> commitTransaction("ignore send errors") we want to
> >>>>> ignore
> >>>>>>>>> errors.
> >>>>>>>>>>>>>>> So
> >>>>>>>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>> do
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 1. send
> >>>>>>>>>>>>>>>>>>>>>> 2. commitTransaction("ignore send errors")
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> the commit will succeed.  You can look at the
> >>> example
> >>>>> in
> >>>>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279
> >>> and
> >>>>>> just
> >>>>>>>>>>>>>>> substitute
> >>>>>>>>>>>>>>>>>>>>>> commitTransaction with commitTransaction("ignore
> >>> send
> >>>>>>>>> errors")
> >>>>>>>>>>>>> and
> >>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>> get
> >>>>>>>>>>>>>>>>>>>>>> the buggy behavior back :-).  Actually, this would
> >>>>>>>>> potentially be
> >>>>>>>>>>>>>>> even
> >>>>>>>>>>>>>>>>>>>>>> worse than the original buggy behavior because the
> >>>> bug
> >>>>>> was
> >>>>>>>>> that
> >>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>> ignored
> >>>>>>>>>>>>>>>>>>>>>> errors that happened in the "send()" method itself,
> >>>> not
> >>>>>>>>>>>>>>> necessarily the
> >>>>>>>>>>>>>>>>>>>>>> ones that we got from the broker.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Actually, looking at
> >>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/11508/files,
> >>>>>>>>>>>>>>>>>>>>>> wouldn't a better solution be to just throw the
> >>> error
> >>>>>> from
> >>>>>>>>> the
> >>>>>>>>>>>>>>> "send"
> >>>>>>>>>>>>>>>>>>>>>> method itself, rather than trying to set it to be
> >>>>> thrown
> >>>>>>>>> during
> >>>>>>>>>>>>>>> commit?
> >>>>>>>>>>>>>>>>>>>>>> This way the example in
> >>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279
> >>>>>>>>>>>>>>>>>>>>>> would be fixed, and at the same time it would give
> >>> an
> >>>>>>>>> opportunity
> >>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>> KS
> >>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> catch the error and ignore it if needed.  Not sure
> >>> if
> >>>>> we
> >>>>>>>>> need a
> >>>>>>>>>>>>>>> KIP for
> >>>>>>>>>>>>>>>>>>>>>> that, just do a better fix of the old bug.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> -Artem
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 4:58 PM Justine Olshan
> >>>>>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid>
> >>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I'm a bit late to the party, but the discussion
> >>> here
> >>>>>> looks
> >>>>>>>>>>>>>>> reasonable.
> >>>>>>>>>>>>>>>>>>>>>>> Moving the logic to a transactional method makes
> >>>> sense
> >>>>>> to
> >>>>>>>>> me and
> >>>>>>>>>>>>>>> makes
> >>>>>>>>>>>>>>>>>>>>> me
> >>>>>>>>>>>>>>>>>>>>>>> feel a bit better about keeping the complexity in
> >>>> the
> >>>>>>>>> methods
> >>>>>>>>>>>>>>> relevant
> >>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>> the issue.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> One minor concern is that if we set "ignore send
> >>>>>>>>>>>>>>>>>>>>>>> errors" (or whatever we decide to name it) option
> >>>>>> without
> >>>>>>>>>>>>> explicit
> >>>>>>>>>>>>>>>>>>>>> flush,
> >>>>>>>>>>>>>>>>>>>>>>> it'll actually lead to broken behavior as the
> >>>>>> application
> >>>>>>>>> won't
> >>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>> able
> >>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>> stop a commit from proceeding even on fatal
> >>> errors.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Is this with respect to the case a request is
> >>> still
> >>>>>>> inflight
> >>>>>>>>>>>>> when
> >>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>> call
> >>>>>>>>>>>>>>>>>>>>>>> commitTransaction? I thought we still wait for
> >>>>> requests
> >>>>>>> (and
> >>>>>>>>>>>>> their
> >>>>>>>>>>>>>>>>>>>>> errors)
> >>>>>>>>>>>>>>>>>>>>>>> to come in and could handle fatal errors
> >>>>> appropriately.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Justine
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 4:32 PM Artem Livshits
> >>>>>>>>>>>>>>>>>>>>>>> <alivsh...@confluent.io.invalid> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Hi Matthias (and other folks who suggested
> >>> ideas),
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> maybe `commitTransaction(CommitOptions)` or
> >>>> similar
> >>>>>>> could
> >>>>>>>>> be a
> >>>>>>>>>>>>>>> good
> >>>>>>>>>>>>>>>>>>>>>>> way
> >>>>>>>>>>>>>>>>>>>>>>>> forward?
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> I like this approach.  One minor concern is that
> >>> if
> >>>>> we
> >>>>>>> set
> >>>>>>>>>>>>>>> "ignore
> >>>>>>>>>>>>>>>>>>> send
> >>>>>>>>>>>>>>>>>>>>>>>> errors" (or whatever we decide to name it) option
> >>>>>> without
> >>>>>>>>>>>>>>> explicit
> >>>>>>>>>>>>>>>>>>>>> flush,
> >>>>>>>>>>>>>>>>>>>>>>>> it'll actually lead to broken behavior as the
> >>>>>> application
> >>>>>>>>> won't
> >>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>> able
> >>>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> stop a commit from proceeding even on fatal
> >>> errors.
> >>>>>> But
> >>>>>>> I
> >>>>>>>>>>>>> guess
> >>>>>>>>>>>>>>>>>>> we'll
> >>>>>>>>>>>>>>>>>>>>>>> just
> >>>>>>>>>>>>>>>>>>>>>>>> have to clearly document it.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> In some way we are basically adding a flag to
> >>>>>> optionally
> >>>>>>>>>>>>> restore
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279
> >>>>> bug,
> >>>>>>>>> which is
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> motivation for all these changes, anyway :-).
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> -Artem
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 2:18 PM Matthias J. Sax <
> >>>>>>>>>>>>>>> mj...@apache.org>
> >>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Seems the option to use a config does not get a
> >>>> lot
> >>>>> of
> >>>>>>>>>>>>> support.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> So we need to go with some form or "overload /
> >>> new
> >>>>>>>>> method". I
> >>>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>>>>>>>>>> Chris' point about not coupling it to `flush()`
> >>>> but
> >>>>>>> rather
> >>>>>>>>>>>>>>>>>>>>>>>>> `commitTransaction()` is actually a very good
> >>> one;
> >>>>> for
> >>>>>>>>> non-tx
> >>>>>>>>>>>>>>> case,
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> different flush variants would not make sense.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> I also like Lianet's idea to pass in some
> >>>> "options"
> >>>>>>>>> object, so
> >>>>>>>>>>>>>>> maybe
> >>>>>>>>>>>>>>>>>>>>>>>>> `commitTransaction(CommitOptions)` or similar
> >>>> could
> >>>>>> be a
> >>>>>>>>> good
> >>>>>>>>>>>>>>> way
> >>>>>>>>>>>>>>>>>>>>>>>>> forward? It's much better than a `boolean`
> >>>>> parameter,
> >>>>>>>>>>>>>>> aesthetically,
> >>>>>>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>>>>> we as extendable in the future if necessary.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Given that we would pass in an optional
> >>> parameter,
> >>>>> we
> >>>>>>>>> might
> >>>>>>>>>>>>> not
> >>>>>>>>>>>>>>> even
> >>>>>>>>>>>>>>>>>>>>>>>>> need to deprecate the existing
> >>>> `commitTransaction()`
> >>>>>>>>> method?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> On 6/20/24 9:12 AM, Andrew Schofield wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Alieh,
> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> I *really* don’t like adding a config which
> >>>> changes
> >>>>>> the
> >>>>>>>>>>>>>>> behaviour
> >>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>> flush() method. We already have too many
> >>> configs.
> >>>>>> But I
> >>>>>>>>>>>>> totally
> >>>>>>>>>>>>>>>>>>>>>>>>> understand
> >>>>>>>>>>>>>>>>>>>>>>>>>> the problem that you’re trying to solve and
> >>> some
> >>>> of
> >>>>>> the
> >>>>>>>>> other
> >>>>>>>>>>>>>>>>>>>>>>>> suggestions
> >>>>>>>>>>>>>>>>>>>>>>>>>> in this thread seem neater.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Personally, I would add another method to
> >>>>>>> KafkaProducer.
> >>>>>>>>> Not
> >>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>>>>>> overload
> >>>>>>>>>>>>>>>>>>>>>>>>>> on flush() because this is not flush() at all.
> >>>>> Using
> >>>>>>>>>>>>> Matthias’s
> >>>>>>>>>>>>>>>>>>>>>>>> options,
> >>>>>>>>>>>>>>>>>>>>>>>>>> I prefer (3).
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>>>>> Andrew
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> On 20 Jun 2024, at 15:08, Lianet M. <
> >>>>>>> liane...@gmail.com
> >>>>>>>>>>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all, thanks for the KIP Alieh!
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> LM1. Totally agree with Artem's point about
> >>> the
> >>>>>> config
> >>>>>>>>> not
> >>>>>>>>>>>>>>> being
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> most
> >>>>>>>>>>>>>>>>>>>>>>>>>>> explicit/flexible way to express this
> >>>> capability.
> >>>>>>>>> Getting
> >>>>>>>>>>>>>>> then to
> >>>>>>>>>>>>>>>>>>>>>>>>> Matthias
> >>>>>>>>>>>>>>>>>>>>>>>>>>> 4 options, what I don't like about 3 and 4 is
> >>>> that
> >>>>>> it
> >>>>>>>>> seems
> >>>>>>>>>>>>>>> they
> >>>>>>>>>>>>>>>>>>>>>>> might
> >>>>>>>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>>>>> age very well? Aren't we going to be wanting
> >>>> some
> >>>>>>> other
> >>>>>>>>>>>>> twist
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> flush
> >>>>>>>>>>>>>>>>>>>>>>>>>>> semantics that will have us adding yet another
> >>>>> param
> >>>>>>> to
> >>>>>>>>> it,
> >>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>>>>> another
> >>>>>>>>>>>>>>>>>>>>>>>>>>> overloaded method? I truly don't have the
> >>>> context
> >>>>> to
> >>>>>>>>> answer
> >>>>>>>>>>>>>>> that,
> >>>>>>>>>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>>>>>>> if it
> >>>>>>>>>>>>>>>>>>>>>>>>>>> feels like a realistic future maybe adding
> >>> some
> >>>>> kind
> >>>>>>>>>>>>>>> FlushOptions
> >>>>>>>>>>>>>>>>>>>>>>>>> params to
> >>>>>>>>>>>>>>>>>>>>>>>>>>> the flush would be better from an
> >>> extensibility
> >>>>>> point
> >>>>>>> of
> >>>>>>>>>>>>>>> view. It
> >>>>>>>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>>>>>> only have the clearErrors option available for
> >>>> now
> >>>>>> but
> >>>>>>>>> could
> >>>>>>>>>>>>>>>>>>> accept
> >>>>>>>>>>>>>>>>>>>>>>>> any
> >>>>>>>>>>>>>>>>>>>>>>>>>>> other we may need. I find that this would
> >>> remove
> >>>>> the
> >>>>>>>>>>>>>>> "ugliness"
> >>>>>>>>>>>>>>>>>>>>>>>> Matthias
> >>>>>>>>>>>>>>>>>>>>>>>>>>> pointed out for 3. and 4.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> LM2. No matter how we end up expressing the
> >>>>>> different
> >>>>>>>>>>>>>>> semantics
> >>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>> flush,
> >>>>>>>>>>>>>>>>>>>>>>>>>>> let's make sure we update the KIP on the flush
> >>>> and
> >>>>>>>>>>>>>>>>>>> commitTransaction
> >>>>>>>>>>>>>>>>>>>>>>>>> java
> >>>>>>>>>>>>>>>>>>>>>>>>>>> docs. It currently states that  flush "clears
> >>>> the
> >>>>>> last
> >>>>>>>>>>>>>>> exception"
> >>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>> commitTransaction "will NOT throw" if called
> >>>> after
> >>>>>>>>> flush,
> >>>>>>>>>>>>> but
> >>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>>> really
> >>>>>>>>>>>>>>>>>>>>>>>>> all
> >>>>>>>>>>>>>>>>>>>>>>>>>>> depends on the config/options/method used.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> LM3. I find it would be helpful to include an
> >>>>>> example
> >>>>>>> to
> >>>>>>>>>>>>> show
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>>>>>>>>> flow
> >>>>>>>>>>>>>>>>>>>>>>>>>>> that we're unblocking (I see this as the great
> >>>>> gain
> >>>>>>>>> here):
> >>>>>>>>>>>>>>> flush
> >>>>>>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>>>> clear
> >>>>>>>>>>>>>>>>>>>>>>>>>>> error option enabled -> catch and do whatever
> >>>>> error
> >>>>>>>>> handling
> >>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>> want
> >>>>>>>>>>>>>>>>>>>>>>>> ->
> >>>>>>>>>>>>>>>>>>>>>>>>>>> commitTransaction successfully
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks!
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Lianet
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 11:26 PM Chris
> >>> Egerton <
> >>>>>>>>>>>>>>>>>>>>>>>> fearthecel...@gmail.com
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Matthias,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> I like the alternatives you've listed. One
> >>> more
> >>>>>> that
> >>>>>>>>> might
> >>>>>>>>>>>>>>> help
> >>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>> if,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> instead of overloading flush(), we overloaded
> >>>>>>>>>>>>>>> commitTransaction()
> >>>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> something like commitTransaction(boolean
> >>>>>>>>>>>>>>> tolerateRecordErrors).
> >>>>>>>>>>>>>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>>>>>>>>>> seems
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> slightly cleaner in that it takes the
> >>>> behavioral
> >>>>>>>>> change we
> >>>>>>>>>>>>>>> want,
> >>>>>>>>>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> applies to transactional producers, to an API
> >>>>>> method
> >>>>>>>>> that
> >>>>>>>>>>>>> is
> >>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>>>>>>>> used
> >>>>>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> transactional producers. It would also avoid
> >>>> the
> >>>>>>> issue
> >>>>>>>>> of
> >>>>>>>>>>>>>>> whether
> >>>>>>>>>>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> flush() (or a new variant of it with altered
> >>>>>>> semantics)
> >>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>>>>>> throw
> >>>>>>>>>>>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> not. Thoughts?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Alieh,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP, I like this direction a
> >>> lot
> >>>>>> more
> >>>>>>>>> than
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> pluggable
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> handler!
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> I share Artem's concerns that enabling this
> >>>>>> behavior
> >>>>>>>>> via
> >>>>>>>>>>>>>>>>>>>>>>>> configuration
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> doesn't seem like a great fit. It's likely
> >>> that
> >>>>>>>>> application
> >>>>>>>>>>>>>>> code
> >>>>>>>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> written in a style that only works with one
> >>>> type
> >>>>> of
> >>>>>>>>>>>>> behavior
> >>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> transactional producers, so requiring that
> >>>>>>> application
> >>>>>>>>> code
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> declare
> >>>>>>>>>>>>>>>>>>>>>>>>> its
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> expectations for the behavior of its producer
> >>>>> seems
> >>>>>>>>> more
> >>>>>>>>>>>>>>>>>>>>>>> appropriate
> >>>>>>>>>>>>>>>>>>>>>>>>> than,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> e.g., allowing users deploying that
> >>> application
> >>>>> to
> >>>>>>>>> tweak a
> >>>>>>>>>>>>>>>>>>>>>>>>> configuration
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> file that gets fed to producers spun up
> >>> inside
> >>>>> it.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Chris
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 10:32 PM Matthias J.
> >>>> Sax
> >>>>> <
> >>>>>>>>>>>>>>>>>>> mj...@apache.org
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP Alieh. I actually like
> >>> the
> >>>>> KIP
> >>>>>>>>> as-is,
> >>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Arthem raises very good points...
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Seems we have four options on how to move
> >>>>> forward?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>   1. add config to allow "silent error
> >>>>> clearance"
> >>>>>> as
> >>>>>>>>> the
> >>>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>>>>>>>>>> proposes
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>   2. change flush() to clear error and let
> >>> it
> >>>>>> throw
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>   3. add new flushAndThrow()` (or better
> >>> name)
> >>>>>> which
> >>>>>>>>>>>>> clears
> >>>>>>>>>>>>>>>>>>> error
> >>>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> throws
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>   4. add `flush(boolean clearAndThrow)` and
> >>>> let
> >>>>>> user
> >>>>>>>>> pick
> >>>>>>>>>>>>>>> (and
> >>>>>>>>>>>>>>>>>>>>>>>>> deprecate
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> existing `flush()`)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> For (2), given that it would be a behavior
> >>>>> change,
> >>>>>>> we
> >>>>>>>>>>>>> might
> >>>>>>>>>>>>>>> also
> >>>>>>>>>>>>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> public "feature flag" config.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> It seems, both (1) and (2) have the issue
> >>>> Artem
> >>>>>>>>> mentioned.
> >>>>>>>>>>>>>>> (3)
> >>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>> (4)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> would be safer to this end, however, for
> >>> both
> >>>> we
> >>>>>>>>> kinda get
> >>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>> ugly
> >>>>>>>>>>>>>>>>>>>>>>>>> API?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Not sure right now if I have any preference.
> >>>>> Seems
> >>>>>>> we
> >>>>>>>>> need
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> pick
> >>>>>>>>>>>>>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> evil and that there is no clear best
> >>> solution?
> >>>>>> Would
> >>>>>>>>> be
> >>>>>>>>>>>>>>> good to
> >>>>>>>>>>>>>>>>>>>>>>> her
> >>>>>>>>>>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> others what they think
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 6/18/24 8:39 PM, Artem Livshits wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Alieh,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thank you for the KIP.  I have a couple of
> >>>>>>>>> suggestions:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> AL1.  We should throw an error from flush
> >>>> after
> >>>>>> we
> >>>>>>>>> clear
> >>>>>>>>>>>>>>> it.
> >>>>>>>>>>>>>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> make it so that both "send + commit" and
> >>>> "send
> >>>>> +
> >>>>>>>>> flush +
> >>>>>>>>>>>>>>>>>>> commit"
> >>>>>>>>>>>>>>>>>>>>>>>> (the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> latter looks like just a more verbose way
> >>> to
> >>>>>>> express
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> former,
> >>>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would be intuitive if it behaves the same)
> >>>>> would
> >>>>>>>>> throw if
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> transaction
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> has an error (so if the code is written
> >>>> either
> >>>>>> way
> >>>>>>>>> it's
> >>>>>>>>>>>>>>> going
> >>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> correct).
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> At the same time, the latter could be
> >>>> extended
> >>>>> by
> >>>>>>> the
> >>>>>>>>>>>>>>> caller to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> intercept
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> exceptions from flush, ignore as needed,
> >>> and
> >>>>>> commit
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> transaction.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> solution would keep basic things simple (if
> >>>>>> someone
> >>>>>>>>> has
> >>>>>>>>>>>>>>> code
> >>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> doesn't
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> require advanced error handling, then basic
> >>>>>> "send +
> >>>>>>>>>>>>> flush +
> >>>>>>>>>>>>>>>>>>>>>>> commit"
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> do the right thing) and advanced things
> >>>>> possible,
> >>>>>>> an
> >>>>>>>>>>>>>>>>>>> application
> >>>>>>>>>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> add
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> try + catch around flush and ignore some
> >>>>> errors.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> AL2.  I'm not sure if config is the best
> >>> way
> >>>> to
> >>>>>>>>> express
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> modification
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the "flush" semantics -- the application
> >>>> logic
> >>>>>> that
> >>>>>>>>> calls
> >>>>>>>>>>>>>>>>>>> "flush"
> >>>>>>>>>>>>>>>>>>>>>>>>> needs
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> match the "flush" semantics and configuring
> >>>>>>>>> semantics in
> >>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>> detached
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> place
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> creates a room for bugs due to
> >>> discrepancies.
> >>>>>> This
> >>>>>>>>> can
> >>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>> especially
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> bad
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> if the producer loads configuration from a
> >>>> file
> >>>>>> at
> >>>>>>>>> run
> >>>>>>>>>>>>>>> time, in
> >>>>>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> case a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mistake in configuration could break the
> >>>>>>> application
> >>>>>>>>>>>>>>> because it
> >>>>>>>>>>>>>>>>>>>>>>> was
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> written
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to expect one "flush" semantics but the
> >>>>> semantics
> >>>>>>> is
> >>>>>>>>>>>>>>> switched.
> >>>>>>>>>>>>>>>>>>>>>>>> Given
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the "flush" semantics needs to match the
> >>>>> caller's
> >>>>>>>>>>>>>>> expectation,
> >>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>> way
> >>>>>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> accomplish that would be to pass the
> >>> caller's
> >>>>>>>>> expectation
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> "flush"
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> call by either have a method with a
> >>> different
> >>>>>> name
> >>>>>>> or
> >>>>>>>>>>>>> have
> >>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>>>>>>> overload
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a Boolen flag that would configure the
> >>>>> semantics
> >>>>>>> (the
> >>>>>>>>>>>>>>> current
> >>>>>>>>>>>>>>>>>>>>>>>> method
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> just redirect to the new one).
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Artem
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Jun 17, 2024 at 9:09 AM Alieh
> >>> Saeedi
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'd like to kick off a discussion for
> >>>> KIP-1059
> >>>>>>> that
> >>>>>>>>>>>>>>> suggests
> >>>>>>>>>>>>>>>>>>>>>>>> adding
> >>>>>>>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> feature to the Producer flush() method.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>> <
> >>>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1059%3A+Enable+the+Producer+flush%28%29+method+to+clear+the+latest+send%28%29+error
> >>>>>
> >>>>
> >>>
>
>

Reply via email to