Hi, Justine, Thanks for the explanation.
70. The proposed fencing logic doesn't apply when pid changes, is that right? If so, I am not sure how complete we are addressing this issue if the pid changes more frequently. Thanks, Jun On Fri, Dec 16, 2022 at 9:16 AM Justine Olshan <jols...@confluent.io.invalid> wrote: > Hi Jun, > > Thanks for replying! > > 70.We already do the overflow mechanism, so my change would just make it > happen more often. > I was also not suggesting a new field in the log, but in the response, > which would be gated by the client version. Sorry if something there is > unclear. I think we are starting to diverge. > The goal of this KIP is to not change to the marker format at all. > > 71. Yes, I guess I was going under the assumption that the log would just > look at its last epoch and treat it as the current epoch. I suppose we can > have some special logic that if the last epoch was on a marker we actually > expect the next epoch or something like that. We just need to distinguish > based on whether we had a commit/abort marker. > > 72. > > if the producer epoch hasn't been bumped on the > broker, it seems that the stucked message will fail the sequence validation > and will be ignored. If the producer epoch has been bumped, we ignore the > sequence check and the stuck message could be appended to the log. So, is > the latter case that we want to guard? > > I'm not sure I follow that "the message will fail the sequence validation". > In some of these cases, we had an abort marker (due to an error) and then > the late message comes in with the correct sequence number. This is a case > covered by the KIP. > The latter case is actually not something we've considered here. I think > generally when we bump the epoch, we are accepting that the sequence does > not need to be checked anymore. My understanding is also that we don't > typically bump epoch mid transaction (based on a quick look at the code) > but let me know if that is the case. > > Thanks, > Justine > > On Thu, Dec 15, 2022 at 12:23 PM Jun Rao <j...@confluent.io.invalid> wrote: > > > Hi, Justine, > > > > Thanks for the reply. > > > > 70. Assigning a new pid on int overflow seems a bit hacky. If we need a > txn > > level id, it will be better to model this explicitly. Adding a new field > > would require a bit more work since it requires a new txn marker format > in > > the log. So, we probably need to guard it with an IBP or metadata version > > and document the impact on downgrade once the new format is written to > the > > log. > > > > 71. Hmm, once the marker is written, the partition will expect the next > > append to be on the next epoch. Does that cover the case you mentioned? > > > > 72. Also, just to be clear on the stucked message issue described in the > > motivation. With EoS, we also validate the sequence id for idempotency. > So, > > with the current logic, if the producer epoch hasn't been bumped on the > > broker, it seems that the stucked message will fail the sequence > validation > > and will be ignored. If the producer epoch has been bumped, we ignore the > > sequence check and the stuck message could be appended to the log. So, is > > the latter case that we want to guard? > > > > Thanks, > > > > Jun > > > > On Wed, Dec 14, 2022 at 10:44 AM Justine Olshan > > <jols...@confluent.io.invalid> wrote: > > > > > Matthias — thanks again for taking time to look a this. You said: > > > > > > > My proposal was only focusing to avoid dangling > > > > > > transactions if records are added without registered partition. -- > Maybe > > > > > > you can add a few more details to the KIP about this scenario for > better > > > > > > documentation purpose? > > > > > > > > > I'm not sure I understand what you mean here. The motivation section > > > describes two scenarios about how the record can be added without a > > > registered partition: > > > > > > > > > > This can happen when a message gets stuck or delayed due to > networking > > > issues or a network partition, the transaction aborts, and then the > > delayed > > > message finally comes in. > > > > > > > > > > Another way hanging transactions can occur is that a client is buggy > > and > > > may somehow try to write to a partition before it adds the partition to > > the > > > transaction. > > > > > > > > > > > > For the first example of this would it be helpful to say that this > > message > > > comes in after the abort, but before the partition is added to the next > > > transaction so it becomes "hanging." Perhaps the next sentence > describing > > > the message becoming part of the next transaction (a different case) > was > > > not properly differentiated. > > > > > > > > > > > > Jun — thanks for reading the KIP. > > > > > > 70. The int typing was a concern. Currently we have a mechanism in > place > > to > > > fence the final epoch when the epoch is about to overflow and assign a > > new > > > producer ID with epoch 0. Of course, this is a bit tricky when it comes > > to > > > the response back to the client. > > > Making this a long could be another option, but I wonder are there any > > > implications on changing this field if the epoch is persisted to disk? > > I'd > > > need to check the usages. > > > > > > 71.This was something Matthias asked about as well. I was considering a > > > possible edge case where a produce request from a new transaction > somehow > > > gets sent right after the marker is written, but before the producer is > > > alerted of the newly bumped epoch. In this case, we may include this > > record > > > when we don't want to. I suppose we could try to do something client > side > > > to bump the epoch after sending an endTxn as well in this scenario — > but > > I > > > wonder how it would work when the server is aborting based on a > > server-side > > > error. I could also be missing something and this scenario is actually > > not > > > possible. > > > > > > Thanks again to everyone reading and commenting. Let me know about any > > > further questions or comments. > > > > > > Justine > > > > > > On Wed, Dec 14, 2022 at 9:41 AM Jun Rao <j...@confluent.io.invalid> > > wrote: > > > > > > > Hi, Justine, > > > > > > > > Thanks for the KIP. A couple of comments. > > > > > > > > 70. Currently, the producer epoch is an int. I am not sure if it's > > enough > > > > to accommodate all transactions in the lifetime of a producer. Should > > we > > > > change that to a long or add a new long field like txnId? > > > > > > > > 71. "it will write the prepare commit message with a bumped epoch and > > > send > > > > WriteTxnMarkerRequests with the bumped epoch." Hmm, the epoch is > > > associated > > > > with the current txn right? So, it seems weird to write a commit > > message > > > > with a bumped epoch. Should we only bump up the epoch in > EndTxnResponse > > > and > > > > rename the field to sth like nextProducerEpoch? > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > > > > > > > > > On Mon, Dec 12, 2022 at 8:54 PM Matthias J. Sax <mj...@apache.org> > > > wrote: > > > > > > > > > Thanks for the background. > > > > > > > > > > 20/30: SGTM. My proposal was only focusing to avoid dangling > > > > > transactions if records are added without registered partition. -- > > > Maybe > > > > > you can add a few more details to the KIP about this scenario for > > > better > > > > > documentation purpose? > > > > > > > > > > 40: I think you hit a fair point about race conditions or client > bugs > > > > > (incorrectly not bumping the epoch). The complexity/confusion for > > using > > > > > the bumped epoch I see, is mainly for internal debugging, ie, > > > inspecting > > > > > log segment dumps -- it seems harder to reason about the system for > > us > > > > > humans. But if we get better guarantees, it would be worth to use > the > > > > > bumped epoch. > > > > > > > > > > 60: as I mentioned already, I don't know the broker internals to > > > provide > > > > > more input. So if nobody else chimes in, we should just move > forward > > > > > with your proposal. > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > On 12/6/22 4:22 PM, Justine Olshan wrote: > > > > > > Hi all, > > > > > > After Artem's questions about error behavior, I've re-evaluated > the > > > > > > unknown producer ID exception and had some discussions offline. > > > > > > > > > > > > I think generally it makes sense to simplify error handling in > > cases > > > > like > > > > > > this and the UNKNOWN_PRODUCER_ID error has a pretty long and > > > > complicated > > > > > > history. Because of this, I propose adding a new error code > > > > > ABORTABLE_ERROR > > > > > > that when encountered by new clients (gated by the produce > request > > > > > version) > > > > > > will simply abort the transaction. This allows the server to have > > > some > > > > > say > > > > > > in whether the client aborts and makes handling much simpler. In > > the > > > > > > future, we can also use this error in other situations where we > > want > > > to > > > > > > abort the transactions. We can even use on other apis. > > > > > > > > > > > > I've added this to the KIP. Let me know if there are any > questions > > or > > > > > > issues. > > > > > > > > > > > > Justine > > > > > > > > > > > > On Fri, Dec 2, 2022 at 10:22 AM Justine Olshan < > > jols...@confluent.io > > > > > > > > > wrote: > > > > > > > > > > > >> Hey Matthias, > > > > > >> > > > > > >> > > > > > >> 20/30 — Maybe I also didn't express myself clearly. For older > > > clients > > > > we > > > > > >> don't have a way to distinguish between a previous and the > current > > > > > >> transaction since we don't have the epoch bump. This means that > a > > > late > > > > > >> message from the previous transaction may be added to the new > one. > > > > With > > > > > >> older clients — we can't guarantee this won't happen if we > already > > > > sent > > > > > the > > > > > >> addPartitionsToTxn call (why we make changes for the newer > client) > > > but > > > > > we > > > > > >> can at least gate some by ensuring that the partition has been > > added > > > > to > > > > > the > > > > > >> transaction. The rationale here is that there are likely LESS > late > > > > > arrivals > > > > > >> as time goes on, so hopefully most late arrivals will come in > > BEFORE > > > > the > > > > > >> addPartitionsToTxn call. Those that arrive before will be > properly > > > > gated > > > > > >> with the describeTransactions approach. > > > > > >> > > > > > >> If we take the approach you suggested, ANY late arrival from a > > > > previous > > > > > >> transaction will be added. And we don't want that. I also don't > > see > > > > any > > > > > >> benefit in sending addPartitionsToTxn over the describeTxns > call. > > > They > > > > > will > > > > > >> both be one extra RPC to the Txn coordinator. > > > > > >> > > > > > >> > > > > > >> To be clear — newer clients will use addPartitionsToTxn instead > of > > > the > > > > > >> DescribeTxns. > > > > > >> > > > > > >> > > > > > >> 40) > > > > > >> My concern is that if we have some delay in the client to bump > the > > > > > epoch, > > > > > >> it could continue to send epoch 73 and those records would not > be > > > > > fenced. > > > > > >> Perhaps this is not an issue if we don't allow the next produce > to > > > go > > > > > >> through before the EndTxn request returns. I'm also thinking > about > > > > > cases of > > > > > >> failure. I will need to think on this a bit. > > > > > >> > > > > > >> I wasn't sure if it was that confusing. But if we think it is, > we > > > can > > > > > >> investigate other ways. > > > > > >> > > > > > >> > > > > > >> 60) > > > > > >> > > > > > >> I'm not sure these are the same purgatories since one is a > produce > > > > > >> purgatory (I was planning on using a callback rather than > > purgatory) > > > > and > > > > > >> the other is simply a request to append to the log. Not sure we > > have > > > > any > > > > > >> structure here for ordering, but my understanding is that the > > broker > > > > > could > > > > > >> handle the write request before it hears back from the Txn > > > > Coordinator. > > > > > >> > > > > > >> Let me know if I misunderstood something or something was > unclear. > > > > > >> > > > > > >> Justine > > > > > >> > > > > > >> On Thu, Dec 1, 2022 at 12:15 PM Matthias J. Sax < > mj...@apache.org > > > > > > > > wrote: > > > > > >> > > > > > >>> Thanks for the details Justine! > > > > > >>> > > > > > >>>> 20) > > > > > >>>> > > > > > >>>> The client side change for 2 is removing the addPartitions to > > > > > >>> transaction > > > > > >>>> call. We don't need to make this from the producer to the txn > > > > > >>> coordinator, > > > > > >>>> only server side. > > > > > >>> > > > > > >>> I think I did not express myself clearly. I understand that we > > can > > > > (and > > > > > >>> should) change the producer to not send the `addPartitions` > > request > > > > any > > > > > >>> longer. But I don't thinks it's requirement to change the > broker? > > > > > >>> > > > > > >>> What I am trying to say is: as a safe-guard and improvement for > > > older > > > > > >>> producers, the partition leader can just send the > `addPartitions` > > > > > >>> request to the TX-coordinator in any case -- if the old > producer > > > > > >>> correctly did send the `addPartition` request to the > > TX-coordinator > > > > > >>> already, the TX-coordinator can just "ignore" is as idempotent. > > > > > However, > > > > > >>> if the old producer has a bug and did forget to sent the > > > > `addPartition` > > > > > >>> request, we would now ensure that the partition is indeed added > > to > > > > the > > > > > >>> TX and thus fix a potential producer bug (even if we don't get > > the > > > > > >>> fencing via the bump epoch). -- It seems to be a good > > improvement? > > > Or > > > > > is > > > > > >>> there a reason to not do this? > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>>> 30) > > > > > >>>> > > > > > >>>> Transaction is ongoing = partition was added to transaction > via > > > > > >>>> addPartitionsToTxn. We check this with the > DescribeTransactions > > > > call. > > > > > >>> Let > > > > > >>>> me know if this wasn't sufficiently explained here: > > > > > >>> > > > > > >>> If we do what I propose in (20), we don't really need to make > > this > > > > > >>> `DescribeTransaction` call, as the partition leader adds the > > > > partition > > > > > >>> for older clients and we get this check for free. > > > > > >>> > > > > > >>> > > > > > >>>> 40) > > > > > >>>> > > > > > >>>> The idea here is that if any messages somehow come in before > we > > > get > > > > > the > > > > > >>> new > > > > > >>>> epoch to the producer, they will be fenced. However, if we > don't > > > > think > > > > > >>> this > > > > > >>>> is necessary, it can be discussed > > > > > >>> > > > > > >>> I agree that we should have epoch fencing. My question is > > > different: > > > > > >>> Assume we are at epoch 73, and we have an ongoing transaction, > > that > > > > is > > > > > >>> committed. It seems natural to write the "prepare commit" > marker > > > and > > > > > the > > > > > >>> `WriteTxMarkerRequest` both with epoch 73, too, as it belongs > to > > > the > > > > > >>> current transaction. Of course, we now also bump the epoch and > > > expect > > > > > >>> the next requests to have epoch 74, and would reject an request > > > with > > > > > >>> epoch 73, as the corresponding TX for epoch 73 was already > > > committed. > > > > > >>> > > > > > >>> It seems you propose to write the "prepare commit marker" and > > > > > >>> `WriteTxMarkerRequest` with epoch 74 though, what would work, > but > > > it > > > > > >>> seems confusing. Is there a reason why we would use the bumped > > > epoch > > > > 74 > > > > > >>> instead of the current epoch 73? > > > > > >>> > > > > > >>> > > > > > >>>> 60) > > > > > >>>> > > > > > >>>> When we are checking if the transaction is ongoing, we need to > > > make > > > > a > > > > > >>> round > > > > > >>>> trip from the leader partition to the transaction coordinator. > > In > > > > the > > > > > >>> time > > > > > >>>> we are waiting for this message to come back, in theory we > could > > > > have > > > > > >>> sent > > > > > >>>> a commit/abort call that would make the original result of the > > > check > > > > > >>> out of > > > > > >>>> date. That is why we can check the leader state before we > write > > to > > > > the > > > > > >>> log. > > > > > >>> > > > > > >>> Thanks. Got it. > > > > > >>> > > > > > >>> However, is this really an issue? We put the produce request in > > > > > >>> purgatory, so how could we process the `WriteTxnMarkerRequest` > > > first? > > > > > >>> Don't we need to put the `WriteTxnMarkerRequest` into > purgatory, > > > too, > > > > > >>> for this case, and process both request in-order? (Again, my > > broker > > > > > >>> knowledge is limited and maybe we don't maintain request order > > for > > > > this > > > > > >>> case, what seems to be an issue IMHO, and I am wondering if > > > changing > > > > > >>> request handling to preserve order for this case might be the > > > cleaner > > > > > >>> solution?) > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> -Matthias > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> On 11/30/22 3:28 PM, Artem Livshits wrote: > > > > > >>>> Hi Justine, > > > > > >>>> > > > > > >>>> I think the interesting part is not in this logic (because it > > > tries > > > > to > > > > > >>>> figure out when UNKNOWN_PRODUCER_ID is retriable and if it's > > > > > retryable, > > > > > >>>> it's definitely not fatal), but what happens when this logic > > > doesn't > > > > > >>> return > > > > > >>>> 'true' and falls through. In the old clients it seems to be > > > fatal, > > > > if > > > > > >>> we > > > > > >>>> keep the behavior in the new clients, I'd expect it would be > > fatal > > > > as > > > > > >>> well. > > > > > >>>> > > > > > >>>> -Artem > > > > > >>>> > > > > > >>>> On Tue, Nov 29, 2022 at 11:57 AM Justine Olshan > > > > > >>>> <jols...@confluent.io.invalid> wrote: > > > > > >>>> > > > > > >>>>> Hi Artem and Jeff, > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> Thanks for taking a look and sorry for the slow response. > > > > > >>>>> > > > > > >>>>> You both mentioned the change to handle UNKNOWN_PRODUCER_ID > > > errors. > > > > > To > > > > > >>> be > > > > > >>>>> clear — this error code will only be sent again when the > > client's > > > > > >>> request > > > > > >>>>> version is high enough to ensure we handle it correctly. > > > > > >>>>> The current (Java) client handles this by the following > > (somewhat > > > > > long) > > > > > >>>>> code snippet: > > > > > >>>>> > > > > > >>>>> // An UNKNOWN_PRODUCER_ID means that we have lost the > producer > > > > state > > > > > >>> on the > > > > > >>>>> broker. Depending on the log start > > > > > >>>>> > > > > > >>>>> // offset, we may want to retry these, as described for each > > case > > > > > >>> below. If > > > > > >>>>> none of those apply, then for the > > > > > >>>>> > > > > > >>>>> // idempotent producer, we will locally bump the epoch and > > reset > > > > the > > > > > >>>>> sequence numbers of in-flight batches from > > > > > >>>>> > > > > > >>>>> // sequence 0, then retry the failed batch, which should now > > > > succeed. > > > > > >>> For > > > > > >>>>> the transactional producer, allow the > > > > > >>>>> > > > > > >>>>> // batch to fail. When processing the failed batch, we will > > > > > transition > > > > > >>> to > > > > > >>>>> an abortable error and set a flag > > > > > >>>>> > > > > > >>>>> // indicating that we need to bump the epoch (if supported by > > the > > > > > >>> broker). > > > > > >>>>> > > > > > >>>>> if (error == Errors.*UNKNOWN_PRODUCER_ID*) { > > > > > >>>>> > > > > > >>>>> if (response.logStartOffset == -1) { > > > > > >>>>> > > > > > >>>>> // We don't know the log start offset with this > > > response. > > > > > We > > > > > >>> should > > > > > >>>>> just retry the request until we get it. > > > > > >>>>> > > > > > >>>>> // The UNKNOWN_PRODUCER_ID error code was added > along > > > > with > > > > > >>> the new > > > > > >>>>> ProduceResponse which includes the > > > > > >>>>> > > > > > >>>>> // logStartOffset. So the '-1' sentinel is not for > > > > backward > > > > > >>>>> compatibility. Instead, it is possible for > > > > > >>>>> > > > > > >>>>> // a broker to not know the logStartOffset at when > it > > > is > > > > > >>> returning > > > > > >>>>> the response because the partition > > > > > >>>>> > > > > > >>>>> // may have moved away from the broker from the > time > > > the > > > > > >>> error was > > > > > >>>>> initially raised to the time the > > > > > >>>>> > > > > > >>>>> // response was being constructed. In these cases, > we > > > > > should > > > > > >>> just > > > > > >>>>> retry the request: we are guaranteed > > > > > >>>>> > > > > > >>>>> // to eventually get a logStartOffset once things > > > settle > > > > > down. > > > > > >>>>> > > > > > >>>>> return true; > > > > > >>>>> > > > > > >>>>> } > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> if (batch.sequenceHasBeenReset()) { > > > > > >>>>> > > > > > >>>>> // When the first inflight batch fails due to the > > > > > truncation > > > > > >>> case, > > > > > >>>>> then the sequences of all the other > > > > > >>>>> > > > > > >>>>> // in flight batches would have been restarted from > > the > > > > > >>> beginning. > > > > > >>>>> However, when those responses > > > > > >>>>> > > > > > >>>>> // come back from the broker, they would also come > > with > > > > an > > > > > >>>>> UNKNOWN_PRODUCER_ID error. In this case, we should not > > > > > >>>>> > > > > > >>>>> // reset the sequence numbers to the beginning. > > > > > >>>>> > > > > > >>>>> return true; > > > > > >>>>> > > > > > >>>>> } else if > (lastAckedOffset(batch.topicPartition).orElse( > > > > > >>>>> *NO_LAST_ACKED_SEQUENCE_NUMBER*) < response.logStartOffset) { > > > > > >>>>> > > > > > >>>>> // The head of the log has been removed, probably > due > > > to > > > > > the > > > > > >>>>> retention time elapsing. In this case, > > > > > >>>>> > > > > > >>>>> // we expect to lose the producer state. For the > > > > > transactional > > > > > >>>>> producer, reset the sequences of all > > > > > >>>>> > > > > > >>>>> // inflight batches to be from the beginning and > > retry > > > > > them, > > > > > >>> so > > > > > >>>>> that the transaction does not need to > > > > > >>>>> > > > > > >>>>> // be aborted. For the idempotent producer, bump > the > > > > epoch > > > > > to > > > > > >>> avoid > > > > > >>>>> reusing (sequence, epoch) pairs > > > > > >>>>> > > > > > >>>>> if (isTransactional()) { > > > > > >>>>> > > > > > >>>>> > > > > > >>> txnPartitionMap.startSequencesAtBeginning(batch.topicPartition, > > > > > >>>>> this.producerIdAndEpoch); > > > > > >>>>> > > > > > >>>>> } else { > > > > > >>>>> > > > > > >>>>> > > requestEpochBumpForPartition(batch.topicPartition); > > > > > >>>>> > > > > > >>>>> } > > > > > >>>>> > > > > > >>>>> return true; > > > > > >>>>> > > > > > >>>>> } > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> if (!isTransactional()) { > > > > > >>>>> > > > > > >>>>> // For the idempotent producer, always retry > > > > > >>> UNKNOWN_PRODUCER_ID > > > > > >>>>> errors. If the batch has the current > > > > > >>>>> > > > > > >>>>> // producer ID and epoch, request a bump of the > > epoch. > > > > > >>> Otherwise > > > > > >>>>> just retry the produce. > > > > > >>>>> > > > > > >>>>> requestEpochBumpForPartition(batch.topicPartition); > > > > > >>>>> > > > > > >>>>> return true; > > > > > >>>>> > > > > > >>>>> } > > > > > >>>>> > > > > > >>>>> } > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> I was considering keeping this behavior — but am open to > > > > simplifying > > > > > >>> it. > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> We are leaving changes to older clients off the table here > > since > > > it > > > > > >>> caused > > > > > >>>>> many issues for clients in the past. Previously this was a > > fatal > > > > > error > > > > > >>> and > > > > > >>>>> we didn't have the mechanisms in place to detect when this > was > > a > > > > > >>> legitimate > > > > > >>>>> case vs some bug or gap in the protocol. Ensuring each > > > transaction > > > > > has > > > > > >>> its > > > > > >>>>> own epoch should close this gap. > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> And to address Jeff's second point: > > > > > >>>>> *does the typical produce request path append records to > local > > > log > > > > > >>> along* > > > > > >>>>> > > > > > >>>>> *with the currentTxnFirstOffset information? I would like to > > > > > >>> understand* > > > > > >>>>> > > > > > >>>>> *when the field is written to disk.* > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> Yes, the first produce request populates this field and > writes > > > the > > > > > >>> offset > > > > > >>>>> as part of the record batch and also to the producer state > > > > snapshot. > > > > > >>> When > > > > > >>>>> we reload the records on restart and/or reassignment, we > > > repopulate > > > > > >>> this > > > > > >>>>> field with the snapshot from disk along with the rest of the > > > > producer > > > > > >>>>> state. > > > > > >>>>> > > > > > >>>>> Let me know if there are further comments and/or questions. > > > > > >>>>> > > > > > >>>>> Thanks, > > > > > >>>>> Justine > > > > > >>>>> > > > > > >>>>> On Tue, Nov 22, 2022 at 9:00 PM Jeff Kim > > > > > <jeff....@confluent.io.invalid > > > > > >>>> > > > > > >>>>> wrote: > > > > > >>>>> > > > > > >>>>>> Hi Justine, > > > > > >>>>>> > > > > > >>>>>> Thanks for the KIP! I have two questions: > > > > > >>>>>> > > > > > >>>>>> 1) For new clients, we can once again return an error > > > > > >>> UNKNOWN_PRODUCER_ID > > > > > >>>>>> for sequences > > > > > >>>>>> that are non-zero when there is no producer state present on > > the > > > > > >>> server. > > > > > >>>>>> This will indicate we missed the 0 sequence and we don't yet > > > want > > > > to > > > > > >>>>> write > > > > > >>>>>> to the log. > > > > > >>>>>> > > > > > >>>>>> I would like to understand the current behavior to handle > > older > > > > > >>> clients, > > > > > >>>>>> and if there are any changes we are making. Maybe I'm > missing > > > > > >>> something, > > > > > >>>>>> but we would want to identify whether we missed the 0 > sequence > > > for > > > > > >>> older > > > > > >>>>>> clients, no? > > > > > >>>>>> > > > > > >>>>>> 2) Upon returning from the transaction coordinator, we can > set > > > the > > > > > >>>>>> transaction > > > > > >>>>>> as ongoing on the leader by populating currentTxnFirstOffset > > > > > >>>>>> through the typical produce request handling. > > > > > >>>>>> > > > > > >>>>>> does the typical produce request path append records to > local > > > log > > > > > >>> along > > > > > >>>>>> with the currentTxnFirstOffset information? I would like to > > > > > understand > > > > > >>>>>> when the field is written to disk. > > > > > >>>>>> > > > > > >>>>>> Thanks, > > > > > >>>>>> Jeff > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>> On Tue, Nov 22, 2022 at 4:44 PM Artem Livshits > > > > > >>>>>> <alivsh...@confluent.io.invalid> wrote: > > > > > >>>>>> > > > > > >>>>>>> Hi Justine, > > > > > >>>>>>> > > > > > >>>>>>> Thank you for the KIP. I have one question. > > > > > >>>>>>> > > > > > >>>>>>> 5) For new clients, we can once again return an error > > > > > >>>>> UNKNOWN_PRODUCER_ID > > > > > >>>>>>> > > > > > >>>>>>> I believe we had problems in the past with returning > > > > > >>>>> UNKNOWN_PRODUCER_ID > > > > > >>>>>>> because it was considered fatal and required client > restart. > > > It > > > > > >>> would > > > > > >>>>> be > > > > > >>>>>>> good to spell out the new client behavior when it receives > > the > > > > > error. > > > > > >>>>>>> > > > > > >>>>>>> -Artem > > > > > >>>>>>> > > > > > >>>>>>> On Tue, Nov 22, 2022 at 10:00 AM Justine Olshan > > > > > >>>>>>> <jols...@confluent.io.invalid> wrote: > > > > > >>>>>>> > > > > > >>>>>>>> Thanks for taking a look Matthias. I've tried to answer > your > > > > > >>>>> questions > > > > > >>>>>>>> below: > > > > > >>>>>>>> > > > > > >>>>>>>> 10) > > > > > >>>>>>>> > > > > > >>>>>>>> Right — so the hanging transaction only occurs when we > have > > a > > > > late > > > > > >>>>>>> message > > > > > >>>>>>>> come in and the partition is never added to a transaction > > > again. > > > > > If > > > > > >>>>> we > > > > > >>>>>>>> never add the partition to a transaction, we will never > > write > > > a > > > > > >>>>> marker > > > > > >>>>>>> and > > > > > >>>>>>>> never advance the LSO. > > > > > >>>>>>>> > > > > > >>>>>>>> If we do end up adding the partition to the transaction (I > > > > suppose > > > > > >>>>> this > > > > > >>>>>>> can > > > > > >>>>>>>> happen before or after the late message comes in) then we > > will > > > > > >>>>> include > > > > > >>>>>>> the > > > > > >>>>>>>> late message in the next (incorrect) transaction. > > > > > >>>>>>>> > > > > > >>>>>>>> So perhaps it is clearer to make the distinction between > > > > messages > > > > > >>>>> that > > > > > >>>>>>>> eventually get added to the transaction (but the wrong > one) > > or > > > > > >>>>> messages > > > > > >>>>>>>> that never get added and become hanging. > > > > > >>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>>> 20) > > > > > >>>>>>>> > > > > > >>>>>>>> The client side change for 2 is removing the addPartitions > > to > > > > > >>>>>> transaction > > > > > >>>>>>>> call. We don't need to make this from the producer to the > > txn > > > > > >>>>>>> coordinator, > > > > > >>>>>>>> only server side. > > > > > >>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>>> In my opinion, the issue with the addPartitionsToTxn call > > for > > > > > older > > > > > >>>>>>> clients > > > > > >>>>>>>> is that we don't have the epoch bump, so we don't know if > > the > > > > > >>> message > > > > > >>>>>>>> belongs to the previous transaction or this one. We need > to > > > > check > > > > > if > > > > > >>>>>> the > > > > > >>>>>>>> partition has been added to this transaction. Of course, > > this > > > > > means > > > > > >>>>> we > > > > > >>>>>>>> won't completely cover the case where we have a really > late > > > > > message > > > > > >>>>> and > > > > > >>>>>>> we > > > > > >>>>>>>> have added the partition to the new transaction, but > that's > > > > > >>>>>> unfortunately > > > > > >>>>>>>> something we will need the new clients to cover. > > > > > >>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>>> 30) > > > > > >>>>>>>> > > > > > >>>>>>>> Transaction is ongoing = partition was added to > transaction > > > via > > > > > >>>>>>>> addPartitionsToTxn. We check this with the > > > DescribeTransactions > > > > > >>> call. > > > > > >>>>>> Let > > > > > >>>>>>>> me know if this wasn't sufficiently explained here: > > > > > >>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>> > > > > > >>>>>> > > > > > >>>>> > > > > > >>> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense#KIP890:TransactionsServerSideDefense-EnsureOngoingTransactionforOlderClients(3) > > > > > >>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>>> 40) > > > > > >>>>>>>> > > > > > >>>>>>>> The idea here is that if any messages somehow come in > before > > > we > > > > > get > > > > > >>>>> the > > > > > >>>>>>> new > > > > > >>>>>>>> epoch to the producer, they will be fenced. However, if we > > > don't > > > > > >>>>> think > > > > > >>>>>>> this > > > > > >>>>>>>> is necessary, it can be discussed > > > > > >>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>>> 50) > > > > > >>>>>>>> > > > > > >>>>>>>> It should be synchronous because if we have an event (ie, > an > > > > > error) > > > > > >>>>>> that > > > > > >>>>>>>> causes us to need to abort the transaction, we need to > know > > > > which > > > > > >>>>>>>> partitions to send transaction markers to. We know the > > > > partitions > > > > > >>>>>> because > > > > > >>>>>>>> we added them to the coordinator via the > addPartitionsToTxn > > > > call. > > > > > >>>>>>>> Previously we have had asynchronous calls in the past (ie, > > > > writing > > > > > >>>>> the > > > > > >>>>>>>> commit markers when the transaction is completed) but > often > > > this > > > > > >>> just > > > > > >>>>>>>> causes confusion as we need to wait for some operations to > > > > > complete. > > > > > >>>>> In > > > > > >>>>>>> the > > > > > >>>>>>>> writing commit markers case, clients often see > > > > > >>>>> CONCURRENT_TRANSACTIONs > > > > > >>>>>>>> error messages and that can be confusing. For that reason, > > it > > > > may > > > > > be > > > > > >>>>>>>> simpler to just have synchronous calls — especially if we > > need > > > > to > > > > > >>>>> block > > > > > >>>>>>> on > > > > > >>>>>>>> some operation's completion anyway before we can start the > > > next > > > > > >>>>>>>> transaction. And yes, I meant coordinator. I will fix > that. > > > > > >>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>>> 60) > > > > > >>>>>>>> > > > > > >>>>>>>> When we are checking if the transaction is ongoing, we > need > > to > > > > > make > > > > > >>> a > > > > > >>>>>>> round > > > > > >>>>>>>> trip from the leader partition to the transaction > > coordinator. > > > > In > > > > > >>> the > > > > > >>>>>>> time > > > > > >>>>>>>> we are waiting for this message to come back, in theory we > > > could > > > > > >>> have > > > > > >>>>>>> sent > > > > > >>>>>>>> a commit/abort call that would make the original result of > > the > > > > > check > > > > > >>>>>> out > > > > > >>>>>>> of > > > > > >>>>>>>> date. That is why we can check the leader state before we > > > write > > > > to > > > > > >>>>> the > > > > > >>>>>>> log. > > > > > >>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>>> I'm happy to update the KIP if some of these things were > not > > > > > clear. > > > > > >>>>>>>> Thanks, > > > > > >>>>>>>> Justine > > > > > >>>>>>>> > > > > > >>>>>>>> On Mon, Nov 21, 2022 at 7:11 PM Matthias J. Sax < > > > > mj...@apache.org > > > > > > > > > > > >>>>>>> wrote: > > > > > >>>>>>>> > > > > > >>>>>>>>> Thanks for the KIP. > > > > > >>>>>>>>> > > > > > >>>>>>>>> Couple of clarification questions (I am not a broker > expert > > > do > > > > > >>>>> maybe > > > > > >>>>>>>>> some question are obvious for others, but not for me with > > my > > > > lack > > > > > >>>>> of > > > > > >>>>>>>>> broker knowledge). > > > > > >>>>>>>>> > > > > > >>>>>>>>> > > > > > >>>>>>>>> > > > > > >>>>>>>>> (10) > > > > > >>>>>>>>> > > > > > >>>>>>>>>> The delayed message case can also violate EOS if the > > delayed > > > > > >>>>>> message > > > > > >>>>>>>>> comes in after the next addPartitionsToTxn request comes > > in. > > > > > >>>>>>> Effectively > > > > > >>>>>>>> we > > > > > >>>>>>>>> may see a message from a previous (aborted) transaction > > > become > > > > > part > > > > > >>>>>> of > > > > > >>>>>>>> the > > > > > >>>>>>>>> next transaction. > > > > > >>>>>>>>> > > > > > >>>>>>>>> What happens if the message come in before the next > > > > > >>>>>> addPartitionsToTxn > > > > > >>>>>>>>> request? It seems the broker hosting the data partitions > > > won't > > > > > know > > > > > >>>>>>>>> anything about it and append it to the partition, too? > What > > > is > > > > > the > > > > > >>>>>>>>> difference between both cases? > > > > > >>>>>>>>> > > > > > >>>>>>>>> Also, it seems a TX would only hang, if there is no > > following > > > > TX > > > > > >>>>> that > > > > > >>>>>>> is > > > > > >>>>>>>>> either committer or aborted? Thus, for the case above, > the > > TX > > > > > might > > > > > >>>>>>>>> actually not hang (of course, we might get an EOS > violation > > > if > > > > > the > > > > > >>>>>>> first > > > > > >>>>>>>>> TX was aborted and the second committed, or the other way > > > > > around). > > > > > >>>>>>>>> > > > > > >>>>>>>>> > > > > > >>>>>>>>> (20) > > > > > >>>>>>>>> > > > > > >>>>>>>>>> Of course, 1 and 2 require client-side changes, so for > > older > > > > > >>>>>> clients, > > > > > >>>>>>>>> those approaches won’t apply. > > > > > >>>>>>>>> > > > > > >>>>>>>>> For (1) I understand why a client change is necessary, > but > > > not > > > > > sure > > > > > >>>>>> why > > > > > >>>>>>>>> we need a client change for (2). Can you elaborate? -- > > Later > > > > you > > > > > >>>>>>> explain > > > > > >>>>>>>>> that we should send a DescribeTransactionRequest, but I > am > > > not > > > > > sure > > > > > >>>>>>> why? > > > > > >>>>>>>>> Can't we not just do an implicit AddPartiitonToTx, too? > If > > > the > > > > > old > > > > > >>>>>>>>> producer correctly registered the partition already, the > > > > > >>>>>> TX-coordinator > > > > > >>>>>>>>> can just ignore it as it's an idempotent operation? > > > > > >>>>>>>>> > > > > > >>>>>>>>> > > > > > >>>>>>>>> (30) > > > > > >>>>>>>>> > > > > > >>>>>>>>>> To cover older clients, we will ensure a transaction is > > > > ongoing > > > > > >>>>>>> before > > > > > >>>>>>>>> we write to a transaction > > > > > >>>>>>>>> > > > > > >>>>>>>>> Not sure what you mean by this? Can you elaborate? > > > > > >>>>>>>>> > > > > > >>>>>>>>> > > > > > >>>>>>>>> (40) > > > > > >>>>>>>>> > > > > > >>>>>>>>>> [the TX-coordinator] will write the prepare commit > message > > > > with > > > > > a > > > > > >>>>>>>> bumped > > > > > >>>>>>>>> epoch and send WriteTxnMarkerRequests with the bumped > > epoch. > > > > > >>>>>>>>> > > > > > >>>>>>>>> Why do we use the bumped epoch for both? It seems more > > > > intuitive > > > > > to > > > > > >>>>>> use > > > > > >>>>>>>>> the current epoch, and only return the bumped epoch to > the > > > > > >>>>> producer? > > > > > >>>>>>>>> > > > > > >>>>>>>>> > > > > > >>>>>>>>> (50) "Implicit AddPartitionToTransaction" > > > > > >>>>>>>>> > > > > > >>>>>>>>> Why does the implicitly sent request need to be > > synchronous? > > > > The > > > > > >>>>> KIP > > > > > >>>>>>>>> also says > > > > > >>>>>>>>> > > > > > >>>>>>>>>> in case we need to abort and need to know which > partitions > > > > > >>>>>>>>> > > > > > >>>>>>>>> What do you mean by this? > > > > > >>>>>>>>> > > > > > >>>>>>>>> > > > > > >>>>>>>>>> we don’t want to write to it before we store in the > > > > transaction > > > > > >>>>>>> manager > > > > > >>>>>>>>> > > > > > >>>>>>>>> Do you mean TX-coordinator instead of "manager"? > > > > > >>>>>>>>> > > > > > >>>>>>>>> > > > > > >>>>>>>>> (60) > > > > > >>>>>>>>> > > > > > >>>>>>>>> For older clients and ensuring that the TX is ongoing, > you > > > > > >>>>> describe a > > > > > >>>>>>>>> race condition. I am not sure if I can follow here. Can > you > > > > > >>>>>> elaborate? > > > > > >>>>>>>>> > > > > > >>>>>>>>> > > > > > >>>>>>>>> > > > > > >>>>>>>>> -Matthias > > > > > >>>>>>>>> > > > > > >>>>>>>>> > > > > > >>>>>>>>> > > > > > >>>>>>>>> On 11/18/22 1:21 PM, Justine Olshan wrote: > > > > > >>>>>>>>>> Hey all! > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> I'd like to start a discussion on my proposal to add > some > > > > > >>>>>> server-side > > > > > >>>>>>>>>> checks on transactions to avoid hanging transactions. I > > know > > > > > this > > > > > >>>>>> has > > > > > >>>>>>>>> been > > > > > >>>>>>>>>> an issue for some time, so I really hope this KIP will > be > > > > > helpful > > > > > >>>>>> for > > > > > >>>>>>>>> many > > > > > >>>>>>>>>> users of EOS. > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> The KIP includes changes that will be compatible with > old > > > > > clients > > > > > >>>>>> and > > > > > >>>>>>>>>> changes to improve performance and correctness on new > > > clients. > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> Please take a look and leave any comments you may have! > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> KIP: > > > > > >>>>>>>>>> > > > > > >>>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>> > > > > > >>>>>> > > > > > >>>>> > > > > > >>> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense > > > > > >>>>>>>>>> JIRA: https://issues.apache.org/jira/browse/KAFKA-14402 > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> Thanks! > > > > > >>>>>>>>>> Justine > > > > > >>>>>>>>>> > > > > > >>>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>> > > > > > >>>>>> > > > > > >>>>> > > > > > >>>> > > > > > >>> > > > > > >> > > > > > > > > > > > > > > > > > > > > >