Thanks for taking a look Matthias. I've tried to answer your questions below:
10) Right — so the hanging transaction only occurs when we have a late message come in and the partition is never added to a transaction again. If we never add the partition to a transaction, we will never write a marker and never advance the LSO. If we do end up adding the partition to the transaction (I suppose this can happen before or after the late message comes in) then we will include the late message in the next (incorrect) transaction. So perhaps it is clearer to make the distinction between messages that eventually get added to the transaction (but the wrong one) or messages that never get added and become hanging. 20) The client side change for 2 is removing the addPartitions to transaction call. We don't need to make this from the producer to the txn coordinator, only server side. In my opinion, the issue with the addPartitionsToTxn call for older clients is that we don't have the epoch bump, so we don't know if the message belongs to the previous transaction or this one. We need to check if the partition has been added to this transaction. Of course, this means we won't completely cover the case where we have a really late message and we have added the partition to the new transaction, but that's unfortunately something we will need the new clients to cover. 30) Transaction is ongoing = partition was added to transaction via addPartitionsToTxn. We check this with the DescribeTransactions call. Let me know if this wasn't sufficiently explained here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense#KIP890:TransactionsServerSideDefense-EnsureOngoingTransactionforOlderClients(3) 40) The idea here is that if any messages somehow come in before we get the new epoch to the producer, they will be fenced. However, if we don't think this is necessary, it can be discussed 50) It should be synchronous because if we have an event (ie, an error) that causes us to need to abort the transaction, we need to know which partitions to send transaction markers to. We know the partitions because we added them to the coordinator via the addPartitionsToTxn call. Previously we have had asynchronous calls in the past (ie, writing the commit markers when the transaction is completed) but often this just causes confusion as we need to wait for some operations to complete. In the writing commit markers case, clients often see CONCURRENT_TRANSACTIONs error messages and that can be confusing. For that reason, it may be simpler to just have synchronous calls — especially if we need to block on some operation's completion anyway before we can start the next transaction. And yes, I meant coordinator. I will fix that. 60) When we are checking if the transaction is ongoing, we need to make a round trip from the leader partition to the transaction coordinator. In the time we are waiting for this message to come back, in theory we could have sent a commit/abort call that would make the original result of the check out of date. That is why we can check the leader state before we write to the log. I'm happy to update the KIP if some of these things were not clear. Thanks, Justine On Mon, Nov 21, 2022 at 7:11 PM Matthias J. Sax <mj...@apache.org> wrote: > Thanks for the KIP. > > Couple of clarification questions (I am not a broker expert do maybe > some question are obvious for others, but not for me with my lack of > broker knowledge). > > > > (10) > > > The delayed message case can also violate EOS if the delayed message > comes in after the next addPartitionsToTxn request comes in. Effectively we > may see a message from a previous (aborted) transaction become part of the > next transaction. > > What happens if the message come in before the next addPartitionsToTxn > request? It seems the broker hosting the data partitions won't know > anything about it and append it to the partition, too? What is the > difference between both cases? > > Also, it seems a TX would only hang, if there is no following TX that is > either committer or aborted? Thus, for the case above, the TX might > actually not hang (of course, we might get an EOS violation if the first > TX was aborted and the second committed, or the other way around). > > > (20) > > > Of course, 1 and 2 require client-side changes, so for older clients, > those approaches won’t apply. > > For (1) I understand why a client change is necessary, but not sure why > we need a client change for (2). Can you elaborate? -- Later you explain > that we should send a DescribeTransactionRequest, but I am not sure why? > Can't we not just do an implicit AddPartiitonToTx, too? If the old > producer correctly registered the partition already, the TX-coordinator > can just ignore it as it's an idempotent operation? > > > (30) > > > To cover older clients, we will ensure a transaction is ongoing before > we write to a transaction > > Not sure what you mean by this? Can you elaborate? > > > (40) > > > [the TX-coordinator] will write the prepare commit message with a bumped > epoch and send WriteTxnMarkerRequests with the bumped epoch. > > Why do we use the bumped epoch for both? It seems more intuitive to use > the current epoch, and only return the bumped epoch to the producer? > > > (50) "Implicit AddPartitionToTransaction" > > Why does the implicitly sent request need to be synchronous? The KIP > also says > > > in case we need to abort and need to know which partitions > > What do you mean by this? > > > > we don’t want to write to it before we store in the transaction manager > > Do you mean TX-coordinator instead of "manager"? > > > (60) > > For older clients and ensuring that the TX is ongoing, you describe a > race condition. I am not sure if I can follow here. Can you elaborate? > > > > -Matthias > > > > On 11/18/22 1:21 PM, Justine Olshan wrote: > > Hey all! > > > > I'd like to start a discussion on my proposal to add some server-side > > checks on transactions to avoid hanging transactions. I know this has > been > > an issue for some time, so I really hope this KIP will be helpful for > many > > users of EOS. > > > > The KIP includes changes that will be compatible with old clients and > > changes to improve performance and correctness on new clients. > > > > Please take a look and leave any comments you may have! > > > > KIP: > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense > > JIRA: https://issues.apache.org/jira/browse/KAFKA-14402 > > > > Thanks! > > Justine > > >