Hi, Justine, Thanks for the reply.
70. Assigning a new pid on int overflow seems a bit hacky. If we need a txn level id, it will be better to model this explicitly. Adding a new field would require a bit more work since it requires a new txn marker format in the log. So, we probably need to guard it with an IBP or metadata version and document the impact on downgrade once the new format is written to the log. 71. Hmm, once the marker is written, the partition will expect the next append to be on the next epoch. Does that cover the case you mentioned? 72. Also, just to be clear on the stucked message issue described in the motivation. With EoS, we also validate the sequence id for idempotency. So, with the current logic, if the producer epoch hasn't been bumped on the broker, it seems that the stucked message will fail the sequence validation and will be ignored. If the producer epoch has been bumped, we ignore the sequence check and the stuck message could be appended to the log. So, is the latter case that we want to guard? Thanks, Jun On Wed, Dec 14, 2022 at 10:44 AM Justine Olshan <jols...@confluent.io.invalid> wrote: > Matthias — thanks again for taking time to look a this. You said: > > > My proposal was only focusing to avoid dangling > > transactions if records are added without registered partition. -- Maybe > > you can add a few more details to the KIP about this scenario for better > > documentation purpose? > > > I'm not sure I understand what you mean here. The motivation section > describes two scenarios about how the record can be added without a > registered partition: > > > > This can happen when a message gets stuck or delayed due to networking > issues or a network partition, the transaction aborts, and then the delayed > message finally comes in. > > > > Another way hanging transactions can occur is that a client is buggy and > may somehow try to write to a partition before it adds the partition to the > transaction. > > > > For the first example of this would it be helpful to say that this message > comes in after the abort, but before the partition is added to the next > transaction so it becomes "hanging." Perhaps the next sentence describing > the message becoming part of the next transaction (a different case) was > not properly differentiated. > > > > Jun — thanks for reading the KIP. > > 70. The int typing was a concern. Currently we have a mechanism in place to > fence the final epoch when the epoch is about to overflow and assign a new > producer ID with epoch 0. Of course, this is a bit tricky when it comes to > the response back to the client. > Making this a long could be another option, but I wonder are there any > implications on changing this field if the epoch is persisted to disk? I'd > need to check the usages. > > 71.This was something Matthias asked about as well. I was considering a > possible edge case where a produce request from a new transaction somehow > gets sent right after the marker is written, but before the producer is > alerted of the newly bumped epoch. In this case, we may include this record > when we don't want to. I suppose we could try to do something client side > to bump the epoch after sending an endTxn as well in this scenario — but I > wonder how it would work when the server is aborting based on a server-side > error. I could also be missing something and this scenario is actually not > possible. > > Thanks again to everyone reading and commenting. Let me know about any > further questions or comments. > > Justine > > On Wed, Dec 14, 2022 at 9:41 AM Jun Rao <j...@confluent.io.invalid> wrote: > > > Hi, Justine, > > > > Thanks for the KIP. A couple of comments. > > > > 70. Currently, the producer epoch is an int. I am not sure if it's enough > > to accommodate all transactions in the lifetime of a producer. Should we > > change that to a long or add a new long field like txnId? > > > > 71. "it will write the prepare commit message with a bumped epoch and > send > > WriteTxnMarkerRequests with the bumped epoch." Hmm, the epoch is > associated > > with the current txn right? So, it seems weird to write a commit message > > with a bumped epoch. Should we only bump up the epoch in EndTxnResponse > and > > rename the field to sth like nextProducerEpoch? > > > > Thanks, > > > > Jun > > > > > > > > On Mon, Dec 12, 2022 at 8:54 PM Matthias J. Sax <mj...@apache.org> > wrote: > > > > > Thanks for the background. > > > > > > 20/30: SGTM. My proposal was only focusing to avoid dangling > > > transactions if records are added without registered partition. -- > Maybe > > > you can add a few more details to the KIP about this scenario for > better > > > documentation purpose? > > > > > > 40: I think you hit a fair point about race conditions or client bugs > > > (incorrectly not bumping the epoch). The complexity/confusion for using > > > the bumped epoch I see, is mainly for internal debugging, ie, > inspecting > > > log segment dumps -- it seems harder to reason about the system for us > > > humans. But if we get better guarantees, it would be worth to use the > > > bumped epoch. > > > > > > 60: as I mentioned already, I don't know the broker internals to > provide > > > more input. So if nobody else chimes in, we should just move forward > > > with your proposal. > > > > > > > > > -Matthias > > > > > > > > > On 12/6/22 4:22 PM, Justine Olshan wrote: > > > > Hi all, > > > > After Artem's questions about error behavior, I've re-evaluated the > > > > unknown producer ID exception and had some discussions offline. > > > > > > > > I think generally it makes sense to simplify error handling in cases > > like > > > > this and the UNKNOWN_PRODUCER_ID error has a pretty long and > > complicated > > > > history. Because of this, I propose adding a new error code > > > ABORTABLE_ERROR > > > > that when encountered by new clients (gated by the produce request > > > version) > > > > will simply abort the transaction. This allows the server to have > some > > > say > > > > in whether the client aborts and makes handling much simpler. In the > > > > future, we can also use this error in other situations where we want > to > > > > abort the transactions. We can even use on other apis. > > > > > > > > I've added this to the KIP. Let me know if there are any questions or > > > > issues. > > > > > > > > Justine > > > > > > > > On Fri, Dec 2, 2022 at 10:22 AM Justine Olshan <jols...@confluent.io > > > > > wrote: > > > > > > > >> Hey Matthias, > > > >> > > > >> > > > >> 20/30 — Maybe I also didn't express myself clearly. For older > clients > > we > > > >> don't have a way to distinguish between a previous and the current > > > >> transaction since we don't have the epoch bump. This means that a > late > > > >> message from the previous transaction may be added to the new one. > > With > > > >> older clients — we can't guarantee this won't happen if we already > > sent > > > the > > > >> addPartitionsToTxn call (why we make changes for the newer client) > but > > > we > > > >> can at least gate some by ensuring that the partition has been added > > to > > > the > > > >> transaction. The rationale here is that there are likely LESS late > > > arrivals > > > >> as time goes on, so hopefully most late arrivals will come in BEFORE > > the > > > >> addPartitionsToTxn call. Those that arrive before will be properly > > gated > > > >> with the describeTransactions approach. > > > >> > > > >> If we take the approach you suggested, ANY late arrival from a > > previous > > > >> transaction will be added. And we don't want that. I also don't see > > any > > > >> benefit in sending addPartitionsToTxn over the describeTxns call. > They > > > will > > > >> both be one extra RPC to the Txn coordinator. > > > >> > > > >> > > > >> To be clear — newer clients will use addPartitionsToTxn instead of > the > > > >> DescribeTxns. > > > >> > > > >> > > > >> 40) > > > >> My concern is that if we have some delay in the client to bump the > > > epoch, > > > >> it could continue to send epoch 73 and those records would not be > > > fenced. > > > >> Perhaps this is not an issue if we don't allow the next produce to > go > > > >> through before the EndTxn request returns. I'm also thinking about > > > cases of > > > >> failure. I will need to think on this a bit. > > > >> > > > >> I wasn't sure if it was that confusing. But if we think it is, we > can > > > >> investigate other ways. > > > >> > > > >> > > > >> 60) > > > >> > > > >> I'm not sure these are the same purgatories since one is a produce > > > >> purgatory (I was planning on using a callback rather than purgatory) > > and > > > >> the other is simply a request to append to the log. Not sure we have > > any > > > >> structure here for ordering, but my understanding is that the broker > > > could > > > >> handle the write request before it hears back from the Txn > > Coordinator. > > > >> > > > >> Let me know if I misunderstood something or something was unclear. > > > >> > > > >> Justine > > > >> > > > >> On Thu, Dec 1, 2022 at 12:15 PM Matthias J. Sax <mj...@apache.org> > > > wrote: > > > >> > > > >>> Thanks for the details Justine! > > > >>> > > > >>>> 20) > > > >>>> > > > >>>> The client side change for 2 is removing the addPartitions to > > > >>> transaction > > > >>>> call. We don't need to make this from the producer to the txn > > > >>> coordinator, > > > >>>> only server side. > > > >>> > > > >>> I think I did not express myself clearly. I understand that we can > > (and > > > >>> should) change the producer to not send the `addPartitions` request > > any > > > >>> longer. But I don't thinks it's requirement to change the broker? > > > >>> > > > >>> What I am trying to say is: as a safe-guard and improvement for > older > > > >>> producers, the partition leader can just send the `addPartitions` > > > >>> request to the TX-coordinator in any case -- if the old producer > > > >>> correctly did send the `addPartition` request to the TX-coordinator > > > >>> already, the TX-coordinator can just "ignore" is as idempotent. > > > However, > > > >>> if the old producer has a bug and did forget to sent the > > `addPartition` > > > >>> request, we would now ensure that the partition is indeed added to > > the > > > >>> TX and thus fix a potential producer bug (even if we don't get the > > > >>> fencing via the bump epoch). -- It seems to be a good improvement? > Or > > > is > > > >>> there a reason to not do this? > > > >>> > > > >>> > > > >>> > > > >>>> 30) > > > >>>> > > > >>>> Transaction is ongoing = partition was added to transaction via > > > >>>> addPartitionsToTxn. We check this with the DescribeTransactions > > call. > > > >>> Let > > > >>>> me know if this wasn't sufficiently explained here: > > > >>> > > > >>> If we do what I propose in (20), we don't really need to make this > > > >>> `DescribeTransaction` call, as the partition leader adds the > > partition > > > >>> for older clients and we get this check for free. > > > >>> > > > >>> > > > >>>> 40) > > > >>>> > > > >>>> The idea here is that if any messages somehow come in before we > get > > > the > > > >>> new > > > >>>> epoch to the producer, they will be fenced. However, if we don't > > think > > > >>> this > > > >>>> is necessary, it can be discussed > > > >>> > > > >>> I agree that we should have epoch fencing. My question is > different: > > > >>> Assume we are at epoch 73, and we have an ongoing transaction, that > > is > > > >>> committed. It seems natural to write the "prepare commit" marker > and > > > the > > > >>> `WriteTxMarkerRequest` both with epoch 73, too, as it belongs to > the > > > >>> current transaction. Of course, we now also bump the epoch and > expect > > > >>> the next requests to have epoch 74, and would reject an request > with > > > >>> epoch 73, as the corresponding TX for epoch 73 was already > committed. > > > >>> > > > >>> It seems you propose to write the "prepare commit marker" and > > > >>> `WriteTxMarkerRequest` with epoch 74 though, what would work, but > it > > > >>> seems confusing. Is there a reason why we would use the bumped > epoch > > 74 > > > >>> instead of the current epoch 73? > > > >>> > > > >>> > > > >>>> 60) > > > >>>> > > > >>>> When we are checking if the transaction is ongoing, we need to > make > > a > > > >>> round > > > >>>> trip from the leader partition to the transaction coordinator. In > > the > > > >>> time > > > >>>> we are waiting for this message to come back, in theory we could > > have > > > >>> sent > > > >>>> a commit/abort call that would make the original result of the > check > > > >>> out of > > > >>>> date. That is why we can check the leader state before we write to > > the > > > >>> log. > > > >>> > > > >>> Thanks. Got it. > > > >>> > > > >>> However, is this really an issue? We put the produce request in > > > >>> purgatory, so how could we process the `WriteTxnMarkerRequest` > first? > > > >>> Don't we need to put the `WriteTxnMarkerRequest` into purgatory, > too, > > > >>> for this case, and process both request in-order? (Again, my broker > > > >>> knowledge is limited and maybe we don't maintain request order for > > this > > > >>> case, what seems to be an issue IMHO, and I am wondering if > changing > > > >>> request handling to preserve order for this case might be the > cleaner > > > >>> solution?) > > > >>> > > > >>> > > > >>> > > > >>> -Matthias > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> On 11/30/22 3:28 PM, Artem Livshits wrote: > > > >>>> Hi Justine, > > > >>>> > > > >>>> I think the interesting part is not in this logic (because it > tries > > to > > > >>>> figure out when UNKNOWN_PRODUCER_ID is retriable and if it's > > > retryable, > > > >>>> it's definitely not fatal), but what happens when this logic > doesn't > > > >>> return > > > >>>> 'true' and falls through. In the old clients it seems to be > fatal, > > if > > > >>> we > > > >>>> keep the behavior in the new clients, I'd expect it would be fatal > > as > > > >>> well. > > > >>>> > > > >>>> -Artem > > > >>>> > > > >>>> On Tue, Nov 29, 2022 at 11:57 AM Justine Olshan > > > >>>> <jols...@confluent.io.invalid> wrote: > > > >>>> > > > >>>>> Hi Artem and Jeff, > > > >>>>> > > > >>>>> > > > >>>>> Thanks for taking a look and sorry for the slow response. > > > >>>>> > > > >>>>> You both mentioned the change to handle UNKNOWN_PRODUCER_ID > errors. > > > To > > > >>> be > > > >>>>> clear — this error code will only be sent again when the client's > > > >>> request > > > >>>>> version is high enough to ensure we handle it correctly. > > > >>>>> The current (Java) client handles this by the following (somewhat > > > long) > > > >>>>> code snippet: > > > >>>>> > > > >>>>> // An UNKNOWN_PRODUCER_ID means that we have lost the producer > > state > > > >>> on the > > > >>>>> broker. Depending on the log start > > > >>>>> > > > >>>>> // offset, we may want to retry these, as described for each case > > > >>> below. If > > > >>>>> none of those apply, then for the > > > >>>>> > > > >>>>> // idempotent producer, we will locally bump the epoch and reset > > the > > > >>>>> sequence numbers of in-flight batches from > > > >>>>> > > > >>>>> // sequence 0, then retry the failed batch, which should now > > succeed. > > > >>> For > > > >>>>> the transactional producer, allow the > > > >>>>> > > > >>>>> // batch to fail. When processing the failed batch, we will > > > transition > > > >>> to > > > >>>>> an abortable error and set a flag > > > >>>>> > > > >>>>> // indicating that we need to bump the epoch (if supported by the > > > >>> broker). > > > >>>>> > > > >>>>> if (error == Errors.*UNKNOWN_PRODUCER_ID*) { > > > >>>>> > > > >>>>> if (response.logStartOffset == -1) { > > > >>>>> > > > >>>>> // We don't know the log start offset with this > response. > > > We > > > >>> should > > > >>>>> just retry the request until we get it. > > > >>>>> > > > >>>>> // The UNKNOWN_PRODUCER_ID error code was added along > > with > > > >>> the new > > > >>>>> ProduceResponse which includes the > > > >>>>> > > > >>>>> // logStartOffset. So the '-1' sentinel is not for > > backward > > > >>>>> compatibility. Instead, it is possible for > > > >>>>> > > > >>>>> // a broker to not know the logStartOffset at when it > is > > > >>> returning > > > >>>>> the response because the partition > > > >>>>> > > > >>>>> // may have moved away from the broker from the time > the > > > >>> error was > > > >>>>> initially raised to the time the > > > >>>>> > > > >>>>> // response was being constructed. In these cases, we > > > should > > > >>> just > > > >>>>> retry the request: we are guaranteed > > > >>>>> > > > >>>>> // to eventually get a logStartOffset once things > settle > > > down. > > > >>>>> > > > >>>>> return true; > > > >>>>> > > > >>>>> } > > > >>>>> > > > >>>>> > > > >>>>> if (batch.sequenceHasBeenReset()) { > > > >>>>> > > > >>>>> // When the first inflight batch fails due to the > > > truncation > > > >>> case, > > > >>>>> then the sequences of all the other > > > >>>>> > > > >>>>> // in flight batches would have been restarted from the > > > >>> beginning. > > > >>>>> However, when those responses > > > >>>>> > > > >>>>> // come back from the broker, they would also come with > > an > > > >>>>> UNKNOWN_PRODUCER_ID error. In this case, we should not > > > >>>>> > > > >>>>> // reset the sequence numbers to the beginning. > > > >>>>> > > > >>>>> return true; > > > >>>>> > > > >>>>> } else if (lastAckedOffset(batch.topicPartition).orElse( > > > >>>>> *NO_LAST_ACKED_SEQUENCE_NUMBER*) < response.logStartOffset) { > > > >>>>> > > > >>>>> // The head of the log has been removed, probably due > to > > > the > > > >>>>> retention time elapsing. In this case, > > > >>>>> > > > >>>>> // we expect to lose the producer state. For the > > > transactional > > > >>>>> producer, reset the sequences of all > > > >>>>> > > > >>>>> // inflight batches to be from the beginning and retry > > > them, > > > >>> so > > > >>>>> that the transaction does not need to > > > >>>>> > > > >>>>> // be aborted. For the idempotent producer, bump the > > epoch > > > to > > > >>> avoid > > > >>>>> reusing (sequence, epoch) pairs > > > >>>>> > > > >>>>> if (isTransactional()) { > > > >>>>> > > > >>>>> > > > >>> txnPartitionMap.startSequencesAtBeginning(batch.topicPartition, > > > >>>>> this.producerIdAndEpoch); > > > >>>>> > > > >>>>> } else { > > > >>>>> > > > >>>>> requestEpochBumpForPartition(batch.topicPartition); > > > >>>>> > > > >>>>> } > > > >>>>> > > > >>>>> return true; > > > >>>>> > > > >>>>> } > > > >>>>> > > > >>>>> > > > >>>>> if (!isTransactional()) { > > > >>>>> > > > >>>>> // For the idempotent producer, always retry > > > >>> UNKNOWN_PRODUCER_ID > > > >>>>> errors. If the batch has the current > > > >>>>> > > > >>>>> // producer ID and epoch, request a bump of the epoch. > > > >>> Otherwise > > > >>>>> just retry the produce. > > > >>>>> > > > >>>>> requestEpochBumpForPartition(batch.topicPartition); > > > >>>>> > > > >>>>> return true; > > > >>>>> > > > >>>>> } > > > >>>>> > > > >>>>> } > > > >>>>> > > > >>>>> > > > >>>>> I was considering keeping this behavior — but am open to > > simplifying > > > >>> it. > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> We are leaving changes to older clients off the table here since > it > > > >>> caused > > > >>>>> many issues for clients in the past. Previously this was a fatal > > > error > > > >>> and > > > >>>>> we didn't have the mechanisms in place to detect when this was a > > > >>> legitimate > > > >>>>> case vs some bug or gap in the protocol. Ensuring each > transaction > > > has > > > >>> its > > > >>>>> own epoch should close this gap. > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> And to address Jeff's second point: > > > >>>>> *does the typical produce request path append records to local > log > > > >>> along* > > > >>>>> > > > >>>>> *with the currentTxnFirstOffset information? I would like to > > > >>> understand* > > > >>>>> > > > >>>>> *when the field is written to disk.* > > > >>>>> > > > >>>>> > > > >>>>> Yes, the first produce request populates this field and writes > the > > > >>> offset > > > >>>>> as part of the record batch and also to the producer state > > snapshot. > > > >>> When > > > >>>>> we reload the records on restart and/or reassignment, we > repopulate > > > >>> this > > > >>>>> field with the snapshot from disk along with the rest of the > > producer > > > >>>>> state. > > > >>>>> > > > >>>>> Let me know if there are further comments and/or questions. > > > >>>>> > > > >>>>> Thanks, > > > >>>>> Justine > > > >>>>> > > > >>>>> On Tue, Nov 22, 2022 at 9:00 PM Jeff Kim > > > <jeff....@confluent.io.invalid > > > >>>> > > > >>>>> wrote: > > > >>>>> > > > >>>>>> Hi Justine, > > > >>>>>> > > > >>>>>> Thanks for the KIP! I have two questions: > > > >>>>>> > > > >>>>>> 1) For new clients, we can once again return an error > > > >>> UNKNOWN_PRODUCER_ID > > > >>>>>> for sequences > > > >>>>>> that are non-zero when there is no producer state present on the > > > >>> server. > > > >>>>>> This will indicate we missed the 0 sequence and we don't yet > want > > to > > > >>>>> write > > > >>>>>> to the log. > > > >>>>>> > > > >>>>>> I would like to understand the current behavior to handle older > > > >>> clients, > > > >>>>>> and if there are any changes we are making. Maybe I'm missing > > > >>> something, > > > >>>>>> but we would want to identify whether we missed the 0 sequence > for > > > >>> older > > > >>>>>> clients, no? > > > >>>>>> > > > >>>>>> 2) Upon returning from the transaction coordinator, we can set > the > > > >>>>>> transaction > > > >>>>>> as ongoing on the leader by populating currentTxnFirstOffset > > > >>>>>> through the typical produce request handling. > > > >>>>>> > > > >>>>>> does the typical produce request path append records to local > log > > > >>> along > > > >>>>>> with the currentTxnFirstOffset information? I would like to > > > understand > > > >>>>>> when the field is written to disk. > > > >>>>>> > > > >>>>>> Thanks, > > > >>>>>> Jeff > > > >>>>>> > > > >>>>>> > > > >>>>>> On Tue, Nov 22, 2022 at 4:44 PM Artem Livshits > > > >>>>>> <alivsh...@confluent.io.invalid> wrote: > > > >>>>>> > > > >>>>>>> Hi Justine, > > > >>>>>>> > > > >>>>>>> Thank you for the KIP. I have one question. > > > >>>>>>> > > > >>>>>>> 5) For new clients, we can once again return an error > > > >>>>> UNKNOWN_PRODUCER_ID > > > >>>>>>> > > > >>>>>>> I believe we had problems in the past with returning > > > >>>>> UNKNOWN_PRODUCER_ID > > > >>>>>>> because it was considered fatal and required client restart. > It > > > >>> would > > > >>>>> be > > > >>>>>>> good to spell out the new client behavior when it receives the > > > error. > > > >>>>>>> > > > >>>>>>> -Artem > > > >>>>>>> > > > >>>>>>> On Tue, Nov 22, 2022 at 10:00 AM Justine Olshan > > > >>>>>>> <jols...@confluent.io.invalid> wrote: > > > >>>>>>> > > > >>>>>>>> Thanks for taking a look Matthias. I've tried to answer your > > > >>>>> questions > > > >>>>>>>> below: > > > >>>>>>>> > > > >>>>>>>> 10) > > > >>>>>>>> > > > >>>>>>>> Right — so the hanging transaction only occurs when we have a > > late > > > >>>>>>> message > > > >>>>>>>> come in and the partition is never added to a transaction > again. > > > If > > > >>>>> we > > > >>>>>>>> never add the partition to a transaction, we will never write > a > > > >>>>> marker > > > >>>>>>> and > > > >>>>>>>> never advance the LSO. > > > >>>>>>>> > > > >>>>>>>> If we do end up adding the partition to the transaction (I > > suppose > > > >>>>> this > > > >>>>>>> can > > > >>>>>>>> happen before or after the late message comes in) then we will > > > >>>>> include > > > >>>>>>> the > > > >>>>>>>> late message in the next (incorrect) transaction. > > > >>>>>>>> > > > >>>>>>>> So perhaps it is clearer to make the distinction between > > messages > > > >>>>> that > > > >>>>>>>> eventually get added to the transaction (but the wrong one) or > > > >>>>> messages > > > >>>>>>>> that never get added and become hanging. > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> 20) > > > >>>>>>>> > > > >>>>>>>> The client side change for 2 is removing the addPartitions to > > > >>>>>> transaction > > > >>>>>>>> call. We don't need to make this from the producer to the txn > > > >>>>>>> coordinator, > > > >>>>>>>> only server side. > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> In my opinion, the issue with the addPartitionsToTxn call for > > > older > > > >>>>>>> clients > > > >>>>>>>> is that we don't have the epoch bump, so we don't know if the > > > >>> message > > > >>>>>>>> belongs to the previous transaction or this one. We need to > > check > > > if > > > >>>>>> the > > > >>>>>>>> partition has been added to this transaction. Of course, this > > > means > > > >>>>> we > > > >>>>>>>> won't completely cover the case where we have a really late > > > message > > > >>>>> and > > > >>>>>>> we > > > >>>>>>>> have added the partition to the new transaction, but that's > > > >>>>>> unfortunately > > > >>>>>>>> something we will need the new clients to cover. > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> 30) > > > >>>>>>>> > > > >>>>>>>> Transaction is ongoing = partition was added to transaction > via > > > >>>>>>>> addPartitionsToTxn. We check this with the > DescribeTransactions > > > >>> call. > > > >>>>>> Let > > > >>>>>>>> me know if this wasn't sufficiently explained here: > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense#KIP890:TransactionsServerSideDefense-EnsureOngoingTransactionforOlderClients(3) > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> 40) > > > >>>>>>>> > > > >>>>>>>> The idea here is that if any messages somehow come in before > we > > > get > > > >>>>> the > > > >>>>>>> new > > > >>>>>>>> epoch to the producer, they will be fenced. However, if we > don't > > > >>>>> think > > > >>>>>>> this > > > >>>>>>>> is necessary, it can be discussed > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> 50) > > > >>>>>>>> > > > >>>>>>>> It should be synchronous because if we have an event (ie, an > > > error) > > > >>>>>> that > > > >>>>>>>> causes us to need to abort the transaction, we need to know > > which > > > >>>>>>>> partitions to send transaction markers to. We know the > > partitions > > > >>>>>> because > > > >>>>>>>> we added them to the coordinator via the addPartitionsToTxn > > call. > > > >>>>>>>> Previously we have had asynchronous calls in the past (ie, > > writing > > > >>>>> the > > > >>>>>>>> commit markers when the transaction is completed) but often > this > > > >>> just > > > >>>>>>>> causes confusion as we need to wait for some operations to > > > complete. > > > >>>>> In > > > >>>>>>> the > > > >>>>>>>> writing commit markers case, clients often see > > > >>>>> CONCURRENT_TRANSACTIONs > > > >>>>>>>> error messages and that can be confusing. For that reason, it > > may > > > be > > > >>>>>>>> simpler to just have synchronous calls — especially if we need > > to > > > >>>>> block > > > >>>>>>> on > > > >>>>>>>> some operation's completion anyway before we can start the > next > > > >>>>>>>> transaction. And yes, I meant coordinator. I will fix that. > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> 60) > > > >>>>>>>> > > > >>>>>>>> When we are checking if the transaction is ongoing, we need to > > > make > > > >>> a > > > >>>>>>> round > > > >>>>>>>> trip from the leader partition to the transaction coordinator. > > In > > > >>> the > > > >>>>>>> time > > > >>>>>>>> we are waiting for this message to come back, in theory we > could > > > >>> have > > > >>>>>>> sent > > > >>>>>>>> a commit/abort call that would make the original result of the > > > check > > > >>>>>> out > > > >>>>>>> of > > > >>>>>>>> date. That is why we can check the leader state before we > write > > to > > > >>>>> the > > > >>>>>>> log. > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> I'm happy to update the KIP if some of these things were not > > > clear. > > > >>>>>>>> Thanks, > > > >>>>>>>> Justine > > > >>>>>>>> > > > >>>>>>>> On Mon, Nov 21, 2022 at 7:11 PM Matthias J. Sax < > > mj...@apache.org > > > > > > > >>>>>>> wrote: > > > >>>>>>>> > > > >>>>>>>>> Thanks for the KIP. > > > >>>>>>>>> > > > >>>>>>>>> Couple of clarification questions (I am not a broker expert > do > > > >>>>> maybe > > > >>>>>>>>> some question are obvious for others, but not for me with my > > lack > > > >>>>> of > > > >>>>>>>>> broker knowledge). > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> (10) > > > >>>>>>>>> > > > >>>>>>>>>> The delayed message case can also violate EOS if the delayed > > > >>>>>> message > > > >>>>>>>>> comes in after the next addPartitionsToTxn request comes in. > > > >>>>>>> Effectively > > > >>>>>>>> we > > > >>>>>>>>> may see a message from a previous (aborted) transaction > become > > > part > > > >>>>>> of > > > >>>>>>>> the > > > >>>>>>>>> next transaction. > > > >>>>>>>>> > > > >>>>>>>>> What happens if the message come in before the next > > > >>>>>> addPartitionsToTxn > > > >>>>>>>>> request? It seems the broker hosting the data partitions > won't > > > know > > > >>>>>>>>> anything about it and append it to the partition, too? What > is > > > the > > > >>>>>>>>> difference between both cases? > > > >>>>>>>>> > > > >>>>>>>>> Also, it seems a TX would only hang, if there is no following > > TX > > > >>>>> that > > > >>>>>>> is > > > >>>>>>>>> either committer or aborted? Thus, for the case above, the TX > > > might > > > >>>>>>>>> actually not hang (of course, we might get an EOS violation > if > > > the > > > >>>>>>> first > > > >>>>>>>>> TX was aborted and the second committed, or the other way > > > around). > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> (20) > > > >>>>>>>>> > > > >>>>>>>>>> Of course, 1 and 2 require client-side changes, so for older > > > >>>>>> clients, > > > >>>>>>>>> those approaches won’t apply. > > > >>>>>>>>> > > > >>>>>>>>> For (1) I understand why a client change is necessary, but > not > > > sure > > > >>>>>> why > > > >>>>>>>>> we need a client change for (2). Can you elaborate? -- Later > > you > > > >>>>>>> explain > > > >>>>>>>>> that we should send a DescribeTransactionRequest, but I am > not > > > sure > > > >>>>>>> why? > > > >>>>>>>>> Can't we not just do an implicit AddPartiitonToTx, too? If > the > > > old > > > >>>>>>>>> producer correctly registered the partition already, the > > > >>>>>> TX-coordinator > > > >>>>>>>>> can just ignore it as it's an idempotent operation? > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> (30) > > > >>>>>>>>> > > > >>>>>>>>>> To cover older clients, we will ensure a transaction is > > ongoing > > > >>>>>>> before > > > >>>>>>>>> we write to a transaction > > > >>>>>>>>> > > > >>>>>>>>> Not sure what you mean by this? Can you elaborate? > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> (40) > > > >>>>>>>>> > > > >>>>>>>>>> [the TX-coordinator] will write the prepare commit message > > with > > > a > > > >>>>>>>> bumped > > > >>>>>>>>> epoch and send WriteTxnMarkerRequests with the bumped epoch. > > > >>>>>>>>> > > > >>>>>>>>> Why do we use the bumped epoch for both? It seems more > > intuitive > > > to > > > >>>>>> use > > > >>>>>>>>> the current epoch, and only return the bumped epoch to the > > > >>>>> producer? > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> (50) "Implicit AddPartitionToTransaction" > > > >>>>>>>>> > > > >>>>>>>>> Why does the implicitly sent request need to be synchronous? > > The > > > >>>>> KIP > > > >>>>>>>>> also says > > > >>>>>>>>> > > > >>>>>>>>>> in case we need to abort and need to know which partitions > > > >>>>>>>>> > > > >>>>>>>>> What do you mean by this? > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>>> we don’t want to write to it before we store in the > > transaction > > > >>>>>>> manager > > > >>>>>>>>> > > > >>>>>>>>> Do you mean TX-coordinator instead of "manager"? > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> (60) > > > >>>>>>>>> > > > >>>>>>>>> For older clients and ensuring that the TX is ongoing, you > > > >>>>> describe a > > > >>>>>>>>> race condition. I am not sure if I can follow here. Can you > > > >>>>>> elaborate? > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> -Matthias > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> On 11/18/22 1:21 PM, Justine Olshan wrote: > > > >>>>>>>>>> Hey all! > > > >>>>>>>>>> > > > >>>>>>>>>> I'd like to start a discussion on my proposal to add some > > > >>>>>> server-side > > > >>>>>>>>>> checks on transactions to avoid hanging transactions. I know > > > this > > > >>>>>> has > > > >>>>>>>>> been > > > >>>>>>>>>> an issue for some time, so I really hope this KIP will be > > > helpful > > > >>>>>> for > > > >>>>>>>>> many > > > >>>>>>>>>> users of EOS. > > > >>>>>>>>>> > > > >>>>>>>>>> The KIP includes changes that will be compatible with old > > > clients > > > >>>>>> and > > > >>>>>>>>>> changes to improve performance and correctness on new > clients. > > > >>>>>>>>>> > > > >>>>>>>>>> Please take a look and leave any comments you may have! > > > >>>>>>>>>> > > > >>>>>>>>>> KIP: > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense > > > >>>>>>>>>> JIRA: https://issues.apache.org/jira/browse/KAFKA-14402 > > > >>>>>>>>>> > > > >>>>>>>>>> Thanks! > > > >>>>>>>>>> Justine > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>>> > > > >>> > > > >> > > > > > > > > > >