Hi, Justine,

Thanks for the explanation.

70. The proposed fencing logic doesn't apply when pid changes, is that
right? If so, I am not sure how complete we are addressing this issue if
the pid changes more frequently.

Thanks,

Jun



On Fri, Dec 16, 2022 at 9:16 AM Justine Olshan <jols...@confluent.io.invalid>
wrote:

> Hi Jun,
>
> Thanks for replying!
>
> 70.We already do the overflow mechanism, so my change would just make it
> happen more often.
> I was also not suggesting a new field in the log, but in the response,
> which would be gated by the client version. Sorry if something there is
> unclear. I think we are starting to diverge.
> The goal of this KIP is to not change to the marker format at all.
>
> 71. Yes, I guess I was going under the assumption that the log would just
> look at its last epoch and treat it as the current epoch. I suppose we can
> have some special logic that if the last epoch was on a marker we actually
> expect the next epoch or something like that. We just need to distinguish
> based on whether we had a commit/abort marker.
>
> 72.
> > if the producer epoch hasn't been bumped on the
> broker, it seems that the stucked message will fail the sequence validation
> and will be ignored. If the producer epoch has been bumped, we ignore the
> sequence check and the stuck message could be appended to the log. So, is
> the latter case that we want to guard?
>
> I'm not sure I follow that "the message will fail the sequence validation".
> In some of these cases, we had an abort marker (due to an error) and then
> the late message comes in with the correct sequence number. This is a case
> covered by the KIP.
> The latter case is actually not something we've considered here. I think
> generally when we bump the epoch, we are accepting that the sequence does
> not need to be checked anymore. My understanding is also that we don't
> typically bump epoch mid transaction (based on a quick look at the code)
> but let me know if that is the case.
>
> Thanks,
> Justine
>
> On Thu, Dec 15, 2022 at 12:23 PM Jun Rao <j...@confluent.io.invalid> wrote:
>
> > Hi, Justine,
> >
> > Thanks for the reply.
> >
> > 70. Assigning a new pid on int overflow seems a bit hacky. If we need a
> txn
> > level id, it will be better to model this explicitly. Adding a new field
> > would require a bit more work since it requires a new txn marker format
> in
> > the log. So, we probably need to guard it with an IBP or metadata version
> > and document the impact on downgrade once the new format is written to
> the
> > log.
> >
> > 71. Hmm, once the marker is written, the partition will expect the next
> > append to be on the next epoch. Does that cover the case you mentioned?
> >
> > 72. Also, just to be clear on the stucked message issue described in the
> > motivation. With EoS, we also validate the sequence id for idempotency.
> So,
> > with the current logic, if the producer epoch hasn't been bumped on the
> > broker, it seems that the stucked message will fail the sequence
> validation
> > and will be ignored. If the producer epoch has been bumped, we ignore the
> > sequence check and the stuck message could be appended to the log. So, is
> > the latter case that we want to guard?
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Dec 14, 2022 at 10:44 AM Justine Olshan
> > <jols...@confluent.io.invalid> wrote:
> >
> > > Matthias — thanks again for taking time to look a this. You said:
> > >
> > > > My proposal was only focusing to avoid dangling
> > >
> > > transactions if records are added without registered partition. --
> Maybe
> > >
> > > you can add a few more details to the KIP about this scenario for
> better
> > >
> > > documentation purpose?
> > >
> > >
> > > I'm not sure I understand what you mean here. The motivation section
> > > describes two scenarios about how the record can be added without a
> > > registered partition:
> > >
> > >
> > > > This can happen when a message gets stuck or delayed due to
> networking
> > > issues or a network partition, the transaction aborts, and then the
> > delayed
> > > message finally comes in.
> > >
> > >
> > > > Another way hanging transactions can occur is that a client is buggy
> > and
> > > may somehow try to write to a partition before it adds the partition to
> > the
> > > transaction.
> > >
> > >
> > >
> > > For the first example of this would it be helpful to say that this
> > message
> > > comes in after the abort, but before the partition is added to the next
> > > transaction so it becomes "hanging." Perhaps the next sentence
> describing
> > > the message becoming part of the next transaction (a different case)
> was
> > > not properly differentiated.
> > >
> > >
> > >
> > > Jun — thanks for reading the KIP.
> > >
> > > 70. The int typing was a concern. Currently we have a mechanism in
> place
> > to
> > > fence the final epoch when the epoch is about to overflow and assign a
> > new
> > > producer ID with epoch 0. Of course, this is a bit tricky when it comes
> > to
> > > the response back to the client.
> > > Making this a long could be another option, but I wonder are there any
> > > implications on changing this field if the epoch is persisted to disk?
> > I'd
> > > need to check the usages.
> > >
> > > 71.This was something Matthias asked about as well. I was considering a
> > > possible edge case where a produce request from a new transaction
> somehow
> > > gets sent right after the marker is written, but before the producer is
> > > alerted of the newly bumped epoch. In this case, we may include this
> > record
> > > when we don't want to. I suppose we could try to do something client
> side
> > > to bump the epoch after sending an endTxn as well in this scenario —
> but
> > I
> > > wonder how it would work when the server is aborting based on a
> > server-side
> > > error. I could also be missing something and this scenario is actually
> > not
> > > possible.
> > >
> > > Thanks again to everyone reading and commenting. Let me know about any
> > > further questions or comments.
> > >
> > > Justine
> > >
> > > On Wed, Dec 14, 2022 at 9:41 AM Jun Rao <j...@confluent.io.invalid>
> > wrote:
> > >
> > > > Hi, Justine,
> > > >
> > > > Thanks for the KIP. A couple of comments.
> > > >
> > > > 70. Currently, the producer epoch is an int. I am not sure if it's
> > enough
> > > > to accommodate all transactions in the lifetime of a producer. Should
> > we
> > > > change that to a long or add a new long field like txnId?
> > > >
> > > > 71. "it will write the prepare commit message with a bumped epoch and
> > > send
> > > > WriteTxnMarkerRequests with the bumped epoch." Hmm, the epoch is
> > > associated
> > > > with the current txn right? So, it seems weird to write a commit
> > message
> > > > with a bumped epoch. Should we only bump up the epoch in
> EndTxnResponse
> > > and
> > > > rename the field to sth like nextProducerEpoch?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > >
> > > > On Mon, Dec 12, 2022 at 8:54 PM Matthias J. Sax <mj...@apache.org>
> > > wrote:
> > > >
> > > > > Thanks for the background.
> > > > >
> > > > > 20/30: SGTM. My proposal was only focusing to avoid dangling
> > > > > transactions if records are added without registered partition. --
> > > Maybe
> > > > > you can add a few more details to the KIP about this scenario for
> > > better
> > > > > documentation purpose?
> > > > >
> > > > > 40: I think you hit a fair point about race conditions or client
> bugs
> > > > > (incorrectly not bumping the epoch). The complexity/confusion for
> > using
> > > > > the bumped epoch I see, is mainly for internal debugging, ie,
> > > inspecting
> > > > > log segment dumps -- it seems harder to reason about the system for
> > us
> > > > > humans. But if we get better guarantees, it would be worth to use
> the
> > > > > bumped epoch.
> > > > >
> > > > > 60: as I mentioned already, I don't know the broker internals to
> > > provide
> > > > > more input. So if nobody else chimes in, we should just move
> forward
> > > > > with your proposal.
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > >
> > > > > On 12/6/22 4:22 PM, Justine Olshan wrote:
> > > > > > Hi all,
> > > > > > After Artem's questions about error behavior, I've re-evaluated
> the
> > > > > > unknown producer ID exception and had some discussions offline.
> > > > > >
> > > > > > I think generally it makes sense to simplify error handling in
> > cases
> > > > like
> > > > > > this and the UNKNOWN_PRODUCER_ID error has a pretty long and
> > > > complicated
> > > > > > history. Because of this, I propose adding a new error code
> > > > > ABORTABLE_ERROR
> > > > > > that when encountered by new clients (gated by the produce
> request
> > > > > version)
> > > > > > will simply abort the transaction. This allows the server to have
> > > some
> > > > > say
> > > > > > in whether the client aborts and makes handling much simpler. In
> > the
> > > > > > future, we can also use this error in other situations where we
> > want
> > > to
> > > > > > abort the transactions. We can even use on other apis.
> > > > > >
> > > > > > I've added this to the KIP. Let me know if there are any
> questions
> > or
> > > > > > issues.
> > > > > >
> > > > > > Justine
> > > > > >
> > > > > > On Fri, Dec 2, 2022 at 10:22 AM Justine Olshan <
> > jols...@confluent.io
> > > >
> > > > > wrote:
> > > > > >
> > > > > >> Hey Matthias,
> > > > > >>
> > > > > >>
> > > > > >> 20/30 — Maybe I also didn't express myself clearly. For older
> > > clients
> > > > we
> > > > > >> don't have a way to distinguish between a previous and the
> current
> > > > > >> transaction since we don't have the epoch bump. This means that
> a
> > > late
> > > > > >> message from the previous transaction may be added to the new
> one.
> > > > With
> > > > > >> older clients — we can't guarantee this won't happen if we
> already
> > > > sent
> > > > > the
> > > > > >> addPartitionsToTxn call (why we make changes for the newer
> client)
> > > but
> > > > > we
> > > > > >> can at least gate some by ensuring that the partition has been
> > added
> > > > to
> > > > > the
> > > > > >> transaction. The rationale here is that there are likely LESS
> late
> > > > > arrivals
> > > > > >> as time goes on, so hopefully most late arrivals will come in
> > BEFORE
> > > > the
> > > > > >> addPartitionsToTxn call. Those that arrive before will be
> properly
> > > > gated
> > > > > >> with the describeTransactions approach.
> > > > > >>
> > > > > >> If we take the approach you suggested, ANY late arrival from a
> > > > previous
> > > > > >> transaction will be added. And we don't want that. I also don't
> > see
> > > > any
> > > > > >> benefit in sending addPartitionsToTxn over the describeTxns
> call.
> > > They
> > > > > will
> > > > > >> both be one extra RPC to the Txn coordinator.
> > > > > >>
> > > > > >>
> > > > > >> To be clear — newer clients will use addPartitionsToTxn instead
> of
> > > the
> > > > > >> DescribeTxns.
> > > > > >>
> > > > > >>
> > > > > >> 40)
> > > > > >> My concern is that if we have some delay in the client to bump
> the
> > > > > epoch,
> > > > > >> it could continue to send epoch 73 and those records would not
> be
> > > > > fenced.
> > > > > >> Perhaps this is not an issue if we don't allow the next produce
> to
> > > go
> > > > > >> through before the EndTxn request returns. I'm also thinking
> about
> > > > > cases of
> > > > > >> failure. I will need to think on this a bit.
> > > > > >>
> > > > > >> I wasn't sure if it was that confusing. But if we think it is,
> we
> > > can
> > > > > >> investigate other ways.
> > > > > >>
> > > > > >>
> > > > > >> 60)
> > > > > >>
> > > > > >> I'm not sure these are the same purgatories since one is a
> produce
> > > > > >> purgatory (I was planning on using a callback rather than
> > purgatory)
> > > > and
> > > > > >> the other is simply a request to append to the log. Not sure we
> > have
> > > > any
> > > > > >> structure here for ordering, but my understanding is that the
> > broker
> > > > > could
> > > > > >> handle the write request before it hears back from the Txn
> > > > Coordinator.
> > > > > >>
> > > > > >> Let me know if I misunderstood something or something was
> unclear.
> > > > > >>
> > > > > >> Justine
> > > > > >>
> > > > > >> On Thu, Dec 1, 2022 at 12:15 PM Matthias J. Sax <
> mj...@apache.org
> > >
> > > > > wrote:
> > > > > >>
> > > > > >>> Thanks for the details Justine!
> > > > > >>>
> > > > > >>>> 20)
> > > > > >>>>
> > > > > >>>> The client side change for 2 is removing the addPartitions to
> > > > > >>> transaction
> > > > > >>>> call. We don't need to make this from the producer to the txn
> > > > > >>> coordinator,
> > > > > >>>> only server side.
> > > > > >>>
> > > > > >>> I think I did not express myself clearly. I understand that we
> > can
> > > > (and
> > > > > >>> should) change the producer to not send the `addPartitions`
> > request
> > > > any
> > > > > >>> longer. But I don't thinks it's requirement to change the
> broker?
> > > > > >>>
> > > > > >>> What I am trying to say is: as a safe-guard and improvement for
> > > older
> > > > > >>> producers, the partition leader can just send the
> `addPartitions`
> > > > > >>> request to the TX-coordinator in any case -- if the old
> producer
> > > > > >>> correctly did send the `addPartition` request to the
> > TX-coordinator
> > > > > >>> already, the TX-coordinator can just "ignore" is as idempotent.
> > > > > However,
> > > > > >>> if the old producer has a bug and did forget to sent the
> > > > `addPartition`
> > > > > >>> request, we would now ensure that the partition is indeed added
> > to
> > > > the
> > > > > >>> TX and thus fix a potential producer bug (even if we don't get
> > the
> > > > > >>> fencing via the bump epoch). -- It seems to be a good
> > improvement?
> > > Or
> > > > > is
> > > > > >>> there a reason to not do this?
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>>> 30)
> > > > > >>>>
> > > > > >>>> Transaction is ongoing = partition was added to transaction
> via
> > > > > >>>> addPartitionsToTxn. We check this with the
> DescribeTransactions
> > > > call.
> > > > > >>> Let
> > > > > >>>> me know if this wasn't sufficiently explained here:
> > > > > >>>
> > > > > >>> If we do what I propose in (20), we don't really need to make
> > this
> > > > > >>> `DescribeTransaction` call, as the partition leader adds the
> > > > partition
> > > > > >>> for older clients and we get this check for free.
> > > > > >>>
> > > > > >>>
> > > > > >>>> 40)
> > > > > >>>>
> > > > > >>>> The idea here is that if any messages somehow come in before
> we
> > > get
> > > > > the
> > > > > >>> new
> > > > > >>>> epoch to the producer, they will be fenced. However, if we
> don't
> > > > think
> > > > > >>> this
> > > > > >>>> is necessary, it can be discussed
> > > > > >>>
> > > > > >>> I agree that we should have epoch fencing. My question is
> > > different:
> > > > > >>> Assume we are at epoch 73, and we have an ongoing transaction,
> > that
> > > > is
> > > > > >>> committed. It seems natural to write the "prepare commit"
> marker
> > > and
> > > > > the
> > > > > >>> `WriteTxMarkerRequest` both with epoch 73, too, as it belongs
> to
> > > the
> > > > > >>> current transaction. Of course, we now also bump the epoch and
> > > expect
> > > > > >>> the next requests to have epoch 74, and would reject an request
> > > with
> > > > > >>> epoch 73, as the corresponding TX for epoch 73 was already
> > > committed.
> > > > > >>>
> > > > > >>> It seems you propose to write the "prepare commit marker" and
> > > > > >>> `WriteTxMarkerRequest` with epoch 74 though, what would work,
> but
> > > it
> > > > > >>> seems confusing. Is there a reason why we would use the bumped
> > > epoch
> > > > 74
> > > > > >>> instead of the current epoch 73?
> > > > > >>>
> > > > > >>>
> > > > > >>>> 60)
> > > > > >>>>
> > > > > >>>> When we are checking if the transaction is ongoing, we need to
> > > make
> > > > a
> > > > > >>> round
> > > > > >>>> trip from the leader partition to the transaction coordinator.
> > In
> > > > the
> > > > > >>> time
> > > > > >>>> we are waiting for this message to come back, in theory we
> could
> > > > have
> > > > > >>> sent
> > > > > >>>> a commit/abort call that would make the original result of the
> > > check
> > > > > >>> out of
> > > > > >>>> date. That is why we can check the leader state before we
> write
> > to
> > > > the
> > > > > >>> log.
> > > > > >>>
> > > > > >>> Thanks. Got it.
> > > > > >>>
> > > > > >>> However, is this really an issue? We put the produce request in
> > > > > >>> purgatory, so how could we process the `WriteTxnMarkerRequest`
> > > first?
> > > > > >>> Don't we need to put the `WriteTxnMarkerRequest` into
> purgatory,
> > > too,
> > > > > >>> for this case, and process both request in-order? (Again, my
> > broker
> > > > > >>> knowledge is limited and maybe we don't maintain request order
> > for
> > > > this
> > > > > >>> case, what seems to be an issue IMHO, and I am wondering if
> > > changing
> > > > > >>> request handling to preserve order for this case might be the
> > > cleaner
> > > > > >>> solution?)
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>> -Matthias
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>> On 11/30/22 3:28 PM, Artem Livshits wrote:
> > > > > >>>> Hi Justine,
> > > > > >>>>
> > > > > >>>> I think the interesting part is not in this logic (because it
> > > tries
> > > > to
> > > > > >>>> figure out when UNKNOWN_PRODUCER_ID is retriable and if it's
> > > > > retryable,
> > > > > >>>> it's definitely not fatal), but what happens when this logic
> > > doesn't
> > > > > >>> return
> > > > > >>>> 'true' and falls through.  In the old clients it seems to be
> > > fatal,
> > > > if
> > > > > >>> we
> > > > > >>>> keep the behavior in the new clients, I'd expect it would be
> > fatal
> > > > as
> > > > > >>> well.
> > > > > >>>>
> > > > > >>>> -Artem
> > > > > >>>>
> > > > > >>>> On Tue, Nov 29, 2022 at 11:57 AM Justine Olshan
> > > > > >>>> <jols...@confluent.io.invalid> wrote:
> > > > > >>>>
> > > > > >>>>> Hi Artem and Jeff,
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> Thanks for taking a look and sorry for the slow response.
> > > > > >>>>>
> > > > > >>>>> You both mentioned the change to handle UNKNOWN_PRODUCER_ID
> > > errors.
> > > > > To
> > > > > >>> be
> > > > > >>>>> clear — this error code will only be sent again when the
> > client's
> > > > > >>> request
> > > > > >>>>> version is high enough to ensure we handle it correctly.
> > > > > >>>>> The current (Java) client handles this by the following
> > (somewhat
> > > > > long)
> > > > > >>>>> code snippet:
> > > > > >>>>>
> > > > > >>>>> // An UNKNOWN_PRODUCER_ID means that we have lost the
> producer
> > > > state
> > > > > >>> on the
> > > > > >>>>> broker. Depending on the log start
> > > > > >>>>>
> > > > > >>>>> // offset, we may want to retry these, as described for each
> > case
> > > > > >>> below. If
> > > > > >>>>> none of those apply, then for the
> > > > > >>>>>
> > > > > >>>>> // idempotent producer, we will locally bump the epoch and
> > reset
> > > > the
> > > > > >>>>> sequence numbers of in-flight batches from
> > > > > >>>>>
> > > > > >>>>> // sequence 0, then retry the failed batch, which should now
> > > > succeed.
> > > > > >>> For
> > > > > >>>>> the transactional producer, allow the
> > > > > >>>>>
> > > > > >>>>> // batch to fail. When processing the failed batch, we will
> > > > > transition
> > > > > >>> to
> > > > > >>>>> an abortable error and set a flag
> > > > > >>>>>
> > > > > >>>>> // indicating that we need to bump the epoch (if supported by
> > the
> > > > > >>> broker).
> > > > > >>>>>
> > > > > >>>>> if (error == Errors.*UNKNOWN_PRODUCER_ID*) {
> > > > > >>>>>
> > > > > >>>>>       if (response.logStartOffset == -1) {
> > > > > >>>>>
> > > > > >>>>>           // We don't know the log start offset with this
> > > response.
> > > > > We
> > > > > >>> should
> > > > > >>>>> just retry the request until we get it.
> > > > > >>>>>
> > > > > >>>>>           // The UNKNOWN_PRODUCER_ID error code was added
> along
> > > > with
> > > > > >>> the new
> > > > > >>>>> ProduceResponse which includes the
> > > > > >>>>>
> > > > > >>>>>           // logStartOffset. So the '-1' sentinel is not for
> > > > backward
> > > > > >>>>> compatibility. Instead, it is possible for
> > > > > >>>>>
> > > > > >>>>>           // a broker to not know the logStartOffset at when
> it
> > > is
> > > > > >>> returning
> > > > > >>>>> the response because the partition
> > > > > >>>>>
> > > > > >>>>>           // may have moved away from the broker from the
> time
> > > the
> > > > > >>> error was
> > > > > >>>>> initially raised to the time the
> > > > > >>>>>
> > > > > >>>>>           // response was being constructed. In these cases,
> we
> > > > > should
> > > > > >>> just
> > > > > >>>>> retry the request: we are guaranteed
> > > > > >>>>>
> > > > > >>>>>           // to eventually get a logStartOffset once things
> > > settle
> > > > > down.
> > > > > >>>>>
> > > > > >>>>>           return true;
> > > > > >>>>>
> > > > > >>>>>       }
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>       if (batch.sequenceHasBeenReset()) {
> > > > > >>>>>
> > > > > >>>>>           // When the first inflight batch fails due to the
> > > > > truncation
> > > > > >>> case,
> > > > > >>>>> then the sequences of all the other
> > > > > >>>>>
> > > > > >>>>>           // in flight batches would have been restarted from
> > the
> > > > > >>> beginning.
> > > > > >>>>> However, when those responses
> > > > > >>>>>
> > > > > >>>>>           // come back from the broker, they would also come
> > with
> > > > an
> > > > > >>>>> UNKNOWN_PRODUCER_ID error. In this case, we should not
> > > > > >>>>>
> > > > > >>>>>           // reset the sequence numbers to the beginning.
> > > > > >>>>>
> > > > > >>>>>           return true;
> > > > > >>>>>
> > > > > >>>>>       } else if
> (lastAckedOffset(batch.topicPartition).orElse(
> > > > > >>>>> *NO_LAST_ACKED_SEQUENCE_NUMBER*) < response.logStartOffset) {
> > > > > >>>>>
> > > > > >>>>>           // The head of the log has been removed, probably
> due
> > > to
> > > > > the
> > > > > >>>>> retention time elapsing. In this case,
> > > > > >>>>>
> > > > > >>>>>           // we expect to lose the producer state. For the
> > > > > transactional
> > > > > >>>>> producer, reset the sequences of all
> > > > > >>>>>
> > > > > >>>>>           // inflight batches to be from the beginning and
> > retry
> > > > > them,
> > > > > >>> so
> > > > > >>>>> that the transaction does not need to
> > > > > >>>>>
> > > > > >>>>>           // be aborted. For the idempotent producer, bump
> the
> > > > epoch
> > > > > to
> > > > > >>> avoid
> > > > > >>>>> reusing (sequence, epoch) pairs
> > > > > >>>>>
> > > > > >>>>>           if (isTransactional()) {
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>> txnPartitionMap.startSequencesAtBeginning(batch.topicPartition,
> > > > > >>>>> this.producerIdAndEpoch);
> > > > > >>>>>
> > > > > >>>>>           } else {
> > > > > >>>>>
> > > > > >>>>>
> >  requestEpochBumpForPartition(batch.topicPartition);
> > > > > >>>>>
> > > > > >>>>>           }
> > > > > >>>>>
> > > > > >>>>>           return true;
> > > > > >>>>>
> > > > > >>>>>       }
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>       if (!isTransactional()) {
> > > > > >>>>>
> > > > > >>>>>           // For the idempotent producer, always retry
> > > > > >>> UNKNOWN_PRODUCER_ID
> > > > > >>>>> errors. If the batch has the current
> > > > > >>>>>
> > > > > >>>>>           // producer ID and epoch, request a bump of the
> > epoch.
> > > > > >>> Otherwise
> > > > > >>>>> just retry the produce.
> > > > > >>>>>
> > > > > >>>>>           requestEpochBumpForPartition(batch.topicPartition);
> > > > > >>>>>
> > > > > >>>>>           return true;
> > > > > >>>>>
> > > > > >>>>>       }
> > > > > >>>>>
> > > > > >>>>> }
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> I was considering keeping this behavior — but am open to
> > > > simplifying
> > > > > >>> it.
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> We are leaving changes to older clients off the table here
> > since
> > > it
> > > > > >>> caused
> > > > > >>>>> many issues for clients in the past. Previously this was a
> > fatal
> > > > > error
> > > > > >>> and
> > > > > >>>>> we didn't have the mechanisms in place to detect when this
> was
> > a
> > > > > >>> legitimate
> > > > > >>>>> case vs some bug or gap in the protocol. Ensuring each
> > > transaction
> > > > > has
> > > > > >>> its
> > > > > >>>>> own epoch should close this gap.
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> And to address Jeff's second point:
> > > > > >>>>> *does the typical produce request path append records to
> local
> > > log
> > > > > >>> along*
> > > > > >>>>>
> > > > > >>>>> *with the currentTxnFirstOffset information? I would like to
> > > > > >>> understand*
> > > > > >>>>>
> > > > > >>>>> *when the field is written to disk.*
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> Yes, the first produce request populates this field and
> writes
> > > the
> > > > > >>> offset
> > > > > >>>>> as part of the record batch and also to the producer state
> > > > snapshot.
> > > > > >>> When
> > > > > >>>>> we reload the records on restart and/or reassignment, we
> > > repopulate
> > > > > >>> this
> > > > > >>>>> field with the snapshot from disk along with the rest of the
> > > > producer
> > > > > >>>>> state.
> > > > > >>>>>
> > > > > >>>>> Let me know if there are further comments and/or questions.
> > > > > >>>>>
> > > > > >>>>> Thanks,
> > > > > >>>>> Justine
> > > > > >>>>>
> > > > > >>>>> On Tue, Nov 22, 2022 at 9:00 PM Jeff Kim
> > > > > <jeff....@confluent.io.invalid
> > > > > >>>>
> > > > > >>>>> wrote:
> > > > > >>>>>
> > > > > >>>>>> Hi Justine,
> > > > > >>>>>>
> > > > > >>>>>> Thanks for the KIP! I have two questions:
> > > > > >>>>>>
> > > > > >>>>>> 1) For new clients, we can once again return an error
> > > > > >>> UNKNOWN_PRODUCER_ID
> > > > > >>>>>> for sequences
> > > > > >>>>>> that are non-zero when there is no producer state present on
> > the
> > > > > >>> server.
> > > > > >>>>>> This will indicate we missed the 0 sequence and we don't yet
> > > want
> > > > to
> > > > > >>>>> write
> > > > > >>>>>> to the log.
> > > > > >>>>>>
> > > > > >>>>>> I would like to understand the current behavior to handle
> > older
> > > > > >>> clients,
> > > > > >>>>>> and if there are any changes we are making. Maybe I'm
> missing
> > > > > >>> something,
> > > > > >>>>>> but we would want to identify whether we missed the 0
> sequence
> > > for
> > > > > >>> older
> > > > > >>>>>> clients, no?
> > > > > >>>>>>
> > > > > >>>>>> 2) Upon returning from the transaction coordinator, we can
> set
> > > the
> > > > > >>>>>> transaction
> > > > > >>>>>> as ongoing on the leader by populating currentTxnFirstOffset
> > > > > >>>>>> through the typical produce request handling.
> > > > > >>>>>>
> > > > > >>>>>> does the typical produce request path append records to
> local
> > > log
> > > > > >>> along
> > > > > >>>>>> with the currentTxnFirstOffset information? I would like to
> > > > > understand
> > > > > >>>>>> when the field is written to disk.
> > > > > >>>>>>
> > > > > >>>>>> Thanks,
> > > > > >>>>>> Jeff
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> On Tue, Nov 22, 2022 at 4:44 PM Artem Livshits
> > > > > >>>>>> <alivsh...@confluent.io.invalid> wrote:
> > > > > >>>>>>
> > > > > >>>>>>> Hi Justine,
> > > > > >>>>>>>
> > > > > >>>>>>> Thank you for the KIP.  I have one question.
> > > > > >>>>>>>
> > > > > >>>>>>> 5) For new clients, we can once again return an error
> > > > > >>>>> UNKNOWN_PRODUCER_ID
> > > > > >>>>>>>
> > > > > >>>>>>> I believe we had problems in the past with returning
> > > > > >>>>> UNKNOWN_PRODUCER_ID
> > > > > >>>>>>> because it was considered fatal and required client
> restart.
> > > It
> > > > > >>> would
> > > > > >>>>> be
> > > > > >>>>>>> good to spell out the new client behavior when it receives
> > the
> > > > > error.
> > > > > >>>>>>>
> > > > > >>>>>>> -Artem
> > > > > >>>>>>>
> > > > > >>>>>>> On Tue, Nov 22, 2022 at 10:00 AM Justine Olshan
> > > > > >>>>>>> <jols...@confluent.io.invalid> wrote:
> > > > > >>>>>>>
> > > > > >>>>>>>> Thanks for taking a look Matthias. I've tried to answer
> your
> > > > > >>>>> questions
> > > > > >>>>>>>> below:
> > > > > >>>>>>>>
> > > > > >>>>>>>> 10)
> > > > > >>>>>>>>
> > > > > >>>>>>>> Right — so the hanging transaction only occurs when we
> have
> > a
> > > > late
> > > > > >>>>>>> message
> > > > > >>>>>>>> come in and the partition is never added to a transaction
> > > again.
> > > > > If
> > > > > >>>>> we
> > > > > >>>>>>>> never add the partition to a transaction, we will never
> > write
> > > a
> > > > > >>>>> marker
> > > > > >>>>>>> and
> > > > > >>>>>>>> never advance the LSO.
> > > > > >>>>>>>>
> > > > > >>>>>>>> If we do end up adding the partition to the transaction (I
> > > > suppose
> > > > > >>>>> this
> > > > > >>>>>>> can
> > > > > >>>>>>>> happen before or after the late message comes in) then we
> > will
> > > > > >>>>> include
> > > > > >>>>>>> the
> > > > > >>>>>>>> late message in the next (incorrect) transaction.
> > > > > >>>>>>>>
> > > > > >>>>>>>> So perhaps it is clearer to make the distinction between
> > > > messages
> > > > > >>>>> that
> > > > > >>>>>>>> eventually get added to the transaction (but the wrong
> one)
> > or
> > > > > >>>>> messages
> > > > > >>>>>>>> that never get added and become hanging.
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>> 20)
> > > > > >>>>>>>>
> > > > > >>>>>>>> The client side change for 2 is removing the addPartitions
> > to
> > > > > >>>>>> transaction
> > > > > >>>>>>>> call. We don't need to make this from the producer to the
> > txn
> > > > > >>>>>>> coordinator,
> > > > > >>>>>>>> only server side.
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>> In my opinion, the issue with the addPartitionsToTxn call
> > for
> > > > > older
> > > > > >>>>>>> clients
> > > > > >>>>>>>> is that we don't have the epoch bump, so we don't know if
> > the
> > > > > >>> message
> > > > > >>>>>>>> belongs to the previous transaction or this one. We need
> to
> > > > check
> > > > > if
> > > > > >>>>>> the
> > > > > >>>>>>>> partition has been added to this transaction. Of course,
> > this
> > > > > means
> > > > > >>>>> we
> > > > > >>>>>>>> won't completely cover the case where we have a really
> late
> > > > > message
> > > > > >>>>> and
> > > > > >>>>>>> we
> > > > > >>>>>>>> have added the partition to the new transaction, but
> that's
> > > > > >>>>>> unfortunately
> > > > > >>>>>>>> something we will need the new clients to cover.
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>> 30)
> > > > > >>>>>>>>
> > > > > >>>>>>>> Transaction is ongoing = partition was added to
> transaction
> > > via
> > > > > >>>>>>>> addPartitionsToTxn. We check this with the
> > > DescribeTransactions
> > > > > >>> call.
> > > > > >>>>>> Let
> > > > > >>>>>>>> me know if this wasn't sufficiently explained here:
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense#KIP890:TransactionsServerSideDefense-EnsureOngoingTransactionforOlderClients(3)
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>> 40)
> > > > > >>>>>>>>
> > > > > >>>>>>>> The idea here is that if any messages somehow come in
> before
> > > we
> > > > > get
> > > > > >>>>> the
> > > > > >>>>>>> new
> > > > > >>>>>>>> epoch to the producer, they will be fenced. However, if we
> > > don't
> > > > > >>>>> think
> > > > > >>>>>>> this
> > > > > >>>>>>>> is necessary, it can be discussed
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>> 50)
> > > > > >>>>>>>>
> > > > > >>>>>>>> It should be synchronous because if we have an event (ie,
> an
> > > > > error)
> > > > > >>>>>> that
> > > > > >>>>>>>> causes us to need to abort the transaction, we need to
> know
> > > > which
> > > > > >>>>>>>> partitions to send transaction markers to. We know the
> > > > partitions
> > > > > >>>>>> because
> > > > > >>>>>>>> we added them to the coordinator via the
> addPartitionsToTxn
> > > > call.
> > > > > >>>>>>>> Previously we have had asynchronous calls in the past (ie,
> > > > writing
> > > > > >>>>> the
> > > > > >>>>>>>> commit markers when the transaction is completed) but
> often
> > > this
> > > > > >>> just
> > > > > >>>>>>>> causes confusion as we need to wait for some operations to
> > > > > complete.
> > > > > >>>>> In
> > > > > >>>>>>> the
> > > > > >>>>>>>> writing commit markers case, clients often see
> > > > > >>>>> CONCURRENT_TRANSACTIONs
> > > > > >>>>>>>> error messages and that can be confusing. For that reason,
> > it
> > > > may
> > > > > be
> > > > > >>>>>>>> simpler to just have synchronous calls — especially if we
> > need
> > > > to
> > > > > >>>>> block
> > > > > >>>>>>> on
> > > > > >>>>>>>> some operation's completion anyway before we can start the
> > > next
> > > > > >>>>>>>> transaction. And yes, I meant coordinator. I will fix
> that.
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>> 60)
> > > > > >>>>>>>>
> > > > > >>>>>>>> When we are checking if the transaction is ongoing, we
> need
> > to
> > > > > make
> > > > > >>> a
> > > > > >>>>>>> round
> > > > > >>>>>>>> trip from the leader partition to the transaction
> > coordinator.
> > > > In
> > > > > >>> the
> > > > > >>>>>>> time
> > > > > >>>>>>>> we are waiting for this message to come back, in theory we
> > > could
> > > > > >>> have
> > > > > >>>>>>> sent
> > > > > >>>>>>>> a commit/abort call that would make the original result of
> > the
> > > > > check
> > > > > >>>>>> out
> > > > > >>>>>>> of
> > > > > >>>>>>>> date. That is why we can check the leader state before we
> > > write
> > > > to
> > > > > >>>>> the
> > > > > >>>>>>> log.
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>> I'm happy to update the KIP if some of these things were
> not
> > > > > clear.
> > > > > >>>>>>>> Thanks,
> > > > > >>>>>>>> Justine
> > > > > >>>>>>>>
> > > > > >>>>>>>> On Mon, Nov 21, 2022 at 7:11 PM Matthias J. Sax <
> > > > mj...@apache.org
> > > > > >
> > > > > >>>>>>> wrote:
> > > > > >>>>>>>>
> > > > > >>>>>>>>> Thanks for the KIP.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Couple of clarification questions (I am not a broker
> expert
> > > do
> > > > > >>>>> maybe
> > > > > >>>>>>>>> some question are obvious for others, but not for me with
> > my
> > > > lack
> > > > > >>>>> of
> > > > > >>>>>>>>> broker knowledge).
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> (10)
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> The delayed message case can also violate EOS if the
> > delayed
> > > > > >>>>>> message
> > > > > >>>>>>>>> comes in after the next addPartitionsToTxn request comes
> > in.
> > > > > >>>>>>> Effectively
> > > > > >>>>>>>> we
> > > > > >>>>>>>>> may see a message from a previous (aborted) transaction
> > > become
> > > > > part
> > > > > >>>>>> of
> > > > > >>>>>>>> the
> > > > > >>>>>>>>> next transaction.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> What happens if the message come in before the next
> > > > > >>>>>> addPartitionsToTxn
> > > > > >>>>>>>>> request? It seems the broker hosting the data partitions
> > > won't
> > > > > know
> > > > > >>>>>>>>> anything about it and append it to the partition, too?
> What
> > > is
> > > > > the
> > > > > >>>>>>>>> difference between both cases?
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Also, it seems a TX would only hang, if there is no
> > following
> > > > TX
> > > > > >>>>> that
> > > > > >>>>>>> is
> > > > > >>>>>>>>> either committer or aborted? Thus, for the case above,
> the
> > TX
> > > > > might
> > > > > >>>>>>>>> actually not hang (of course, we might get an EOS
> violation
> > > if
> > > > > the
> > > > > >>>>>>> first
> > > > > >>>>>>>>> TX was aborted and the second committed, or the other way
> > > > > around).
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> (20)
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> Of course, 1 and 2 require client-side changes, so for
> > older
> > > > > >>>>>> clients,
> > > > > >>>>>>>>> those approaches won’t apply.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> For (1) I understand why a client change is necessary,
> but
> > > not
> > > > > sure
> > > > > >>>>>> why
> > > > > >>>>>>>>> we need a client change for (2). Can you elaborate? --
> > Later
> > > > you
> > > > > >>>>>>> explain
> > > > > >>>>>>>>> that we should send a DescribeTransactionRequest, but I
> am
> > > not
> > > > > sure
> > > > > >>>>>>> why?
> > > > > >>>>>>>>> Can't we not just do an implicit AddPartiitonToTx, too?
> If
> > > the
> > > > > old
> > > > > >>>>>>>>> producer correctly registered the partition already, the
> > > > > >>>>>> TX-coordinator
> > > > > >>>>>>>>> can just ignore it as it's an idempotent operation?
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> (30)
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> To cover older clients, we will ensure a transaction is
> > > > ongoing
> > > > > >>>>>>> before
> > > > > >>>>>>>>> we write to a transaction
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Not sure what you mean by this? Can you elaborate?
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> (40)
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> [the TX-coordinator] will write the prepare commit
> message
> > > > with
> > > > > a
> > > > > >>>>>>>> bumped
> > > > > >>>>>>>>> epoch and send WriteTxnMarkerRequests with the bumped
> > epoch.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Why do we use the bumped epoch for both? It seems more
> > > > intuitive
> > > > > to
> > > > > >>>>>> use
> > > > > >>>>>>>>> the current epoch, and only return the bumped epoch to
> the
> > > > > >>>>> producer?
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> (50) "Implicit AddPartitionToTransaction"
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Why does the implicitly sent request need to be
> > synchronous?
> > > > The
> > > > > >>>>> KIP
> > > > > >>>>>>>>> also says
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> in case we need to abort and need to know which
> partitions
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> What do you mean by this?
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> we don’t want to write to it before we store in the
> > > > transaction
> > > > > >>>>>>> manager
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Do you mean TX-coordinator instead of "manager"?
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> (60)
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> For older clients and ensuring that the TX is ongoing,
> you
> > > > > >>>>> describe a
> > > > > >>>>>>>>> race condition. I am not sure if I can follow here. Can
> you
> > > > > >>>>>> elaborate?
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> -Matthias
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> On 11/18/22 1:21 PM, Justine Olshan wrote:
> > > > > >>>>>>>>>> Hey all!
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> I'd like to start a discussion on my proposal to add
> some
> > > > > >>>>>> server-side
> > > > > >>>>>>>>>> checks on transactions to avoid hanging transactions. I
> > know
> > > > > this
> > > > > >>>>>> has
> > > > > >>>>>>>>> been
> > > > > >>>>>>>>>> an issue for some time, so I really hope this KIP will
> be
> > > > > helpful
> > > > > >>>>>> for
> > > > > >>>>>>>>> many
> > > > > >>>>>>>>>> users of EOS.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> The KIP includes changes that will be compatible with
> old
> > > > > clients
> > > > > >>>>>> and
> > > > > >>>>>>>>>> changes to improve performance and correctness on new
> > > clients.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Please take a look and leave any comments you may have!
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> KIP:
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense
> > > > > >>>>>>>>>> JIRA: https://issues.apache.org/jira/browse/KAFKA-14402
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Thanks!
> > > > > >>>>>>>>>> Justine
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to