Hi Daniel, Thanks for reporting the issue, and the investigation. I'm curious, so, what's your workaround for this issue?
I agree with Artem, it makes sense. Please file a bug in JIRA. And looking forward to your PR! :) Thank you. Luke On Thu, Jul 7, 2022 at 3:07 AM Artem Livshits <alivsh...@confluent.io.invalid> wrote: > Hi Daniel, > > What you say makes sense. Could you file a bug and put this info there so > that it's easier to track? > > -Artem > > On Wed, Jul 6, 2022 at 8:34 AM Dániel Urbán <urb.dani...@gmail.com> wrote: > > > Hello everyone, > > > > I've been investigating some transaction related issues in a very > > problematic cluster. Besides finding some interesting issues, I had some > > ideas about how transactional producer behavior could be improved. > > > > My suggestion in short is: when the transactional producer encounters an > > error which doesn't necessarily mean that the in-flight request was > > processed (for example a client side timeout), the producer should not > send > > an EndTxnRequest on abort, but instead it should bump the producer epoch. > > > > The long description about the issue I found, and how I came to the > > suggestion: > > > > First, the description of the issue. When I say that the cluster is "very > > problematic", I mean all kinds of different issues, be it infra (disks > and > > network) or throughput (high volume producers without fine tuning). > > In this cluster, Kafka transactions are widely used by many producers. > And > > in this cluster, partitions get "stuck" frequently (few times every > week). > > > > The exact meaning of a partition being "stuck" is this: > > > > On the client side: > > 1. A transactional producer sends X batches to a partition in a single > > transaction > > 2. Out of the X batches, the last few get sent, but are timed out thanks > to > > the delivery timeout config > > 3. producer.flush() is unblocked due to all batches being "finished" > > 4. Based on the errors reported in the producer.send() callback, > > producer.abortTransaction() is called > > 5. Then producer.close() is also invoked with a 5s timeout (this > > application does not reuse the producer instances optimally) > > 6. The transactional.id of the producer is never reused (it was random > > generated) > > > > On the partition leader side (what appears in the log segment of the > > partition): > > 1. The batches sent by the producer are all appended to the log > > 2. But the ABORT marker of the transaction was appended before the last 1 > > or 2 batches of the transaction > > > > On the transaction coordinator side (what appears in the transaction > state > > partition): > > The transactional.id is present with the Empty state. > > > > These happenings result in the following: > > 1. The partition leader handles the first batch after the ABORT marker as > > the first message of a new transaction of the same producer id + epoch. > > (LSO is blocked at this point) > > 2. The transaction coordinator is not aware of any in-progress > transaction > > of the producer, thus never aborting the transaction, not even after the > > transaction.timeout.ms passes. > > > > This is happening with Kafka 2.5 running in the cluster, producer > versions > > range between 2.0 and 2.6. > > I scanned through a lot of tickets, and I believe that this issue is not > > specific to these versions, and could happen with newest versions as > well. > > If I'm mistaken, some pointers would be appreciated. > > > > Assuming that the issue could occur with any version, I believe this > issue > > boils down to one oversight on the client side: > > When a request fails without a definitive response (e.g. a delivery > > timeout), the client cannot assume that the request is "finished", and > > simply abort the transaction. If the request is still in flight, and the > > EndTxnRequest, then the WriteTxnMarkerRequest gets sent and processed > > earlier, the contract is violated by the client. > > This could be avoided by providing more information to the partition > > leader. Right now, a new transactional batch signals the start of a new > > transaction, and there is no way for the partition leader to decide > whether > > the batch is an out-of-order message. > > In a naive and wasteful protocol, we could have a unique transaction id > > added to each batch and marker, meaning that the leader would be capable > of > > refusing batches which arrive after the control marker of the > transaction. > > But instead of changing the log format and the protocol, we can achieve > the > > same by bumping the producer epoch. > > > > Bumping the epoch has a similar effect to "changing the transaction id" - > > the in-progress transaction will be aborted with a bumped producer epoch, > > telling the partition leader about the producer epoch change. From this > > point on, any batches sent with the old epoch will be refused by the > leader > > due to the fencing mechanism. It doesn't really matter how many batches > > will get appended to the log, and how many will be refused - this is an > > aborted transaction - but the out-of-order message cannot occur, and > cannot > > block the LSO infinitely. > > > > My suggestion is, that the TransactionManager inside the producer should > > keep track of what type of errors were encountered by the batches of the > > transaction, and categorize them along the lines of "definitely > completed" > > and "might not be completed". When the transaction goes into an abortable > > state, and there is at least one batch with "might not be completed", the > > EndTxnRequest should be skipped, and an epoch bump should be sent. > > As for what type of error counts as "might not be completed", I can only > > think of client side timeouts. > > > > I believe this is a relatively small change (only affects the client > lib), > > but it helps in avoiding some corrupt states in Kafka transactions. > > > > Looking forward to your input. If it seems like a sane idea, I go ahead > and > > submit a PR for it as well. > > > > Thanks in advance, > > Daniel > > >