Hi Daniel,

Thanks for reporting the issue, and the investigation.
I'm curious, so, what's your workaround for this issue?

I agree with Artem, it makes sense. Please file a bug in JIRA.
And looking forward to your PR! :)

Thank you.
Luke

On Thu, Jul 7, 2022 at 3:07 AM Artem Livshits
<alivsh...@confluent.io.invalid> wrote:

> Hi Daniel,
>
> What you say makes sense.  Could you file a bug and put this info there so
> that it's easier to track?
>
> -Artem
>
> On Wed, Jul 6, 2022 at 8:34 AM Dániel Urbán <urb.dani...@gmail.com> wrote:
>
> > Hello everyone,
> >
> > I've been investigating some transaction related issues in a very
> > problematic cluster. Besides finding some interesting issues, I had some
> > ideas about how transactional producer behavior could be improved.
> >
> > My suggestion in short is: when the transactional producer encounters an
> > error which doesn't necessarily mean that the in-flight request was
> > processed (for example a client side timeout), the producer should not
> send
> > an EndTxnRequest on abort, but instead it should bump the producer epoch.
> >
> > The long description about the issue I found, and how I came to the
> > suggestion:
> >
> > First, the description of the issue. When I say that the cluster is "very
> > problematic", I mean all kinds of different issues, be it infra (disks
> and
> > network) or throughput (high volume producers without fine tuning).
> > In this cluster, Kafka transactions are widely used by many producers.
> And
> > in this cluster, partitions get "stuck" frequently (few times every
> week).
> >
> > The exact meaning of a partition being "stuck" is this:
> >
> > On the client side:
> > 1. A transactional producer sends X batches to a partition in a single
> > transaction
> > 2. Out of the X batches, the last few get sent, but are timed out thanks
> to
> > the delivery timeout config
> > 3. producer.flush() is unblocked due to all batches being "finished"
> > 4. Based on the errors reported in the producer.send() callback,
> > producer.abortTransaction() is called
> > 5. Then producer.close() is also invoked with a 5s timeout (this
> > application does not reuse the producer instances optimally)
> > 6. The transactional.id of the producer is never reused (it was random
> > generated)
> >
> > On the partition leader side (what appears in the log segment of the
> > partition):
> > 1. The batches sent by the producer are all appended to the log
> > 2. But the ABORT marker of the transaction was appended before the last 1
> > or 2 batches of the transaction
> >
> > On the transaction coordinator side (what appears in the transaction
> state
> > partition):
> > The transactional.id is present with the Empty state.
> >
> > These happenings result in the following:
> > 1. The partition leader handles the first batch after the ABORT marker as
> > the first message of a new transaction of the same producer id + epoch.
> > (LSO is blocked at this point)
> > 2. The transaction coordinator is not aware of any in-progress
> transaction
> > of the producer, thus never aborting the transaction, not even after the
> > transaction.timeout.ms passes.
> >
> > This is happening with Kafka 2.5 running in the cluster, producer
> versions
> > range between 2.0 and 2.6.
> > I scanned through a lot of tickets, and I believe that this issue is not
> > specific to these versions, and could happen with newest versions as
> well.
> > If I'm mistaken, some pointers would be appreciated.
> >
> > Assuming that the issue could occur with any version, I believe this
> issue
> > boils down to one oversight on the client side:
> > When a request fails without a definitive response (e.g. a delivery
> > timeout), the client cannot assume that the request is "finished", and
> > simply abort the transaction. If the request is still in flight, and the
> > EndTxnRequest, then the WriteTxnMarkerRequest gets sent and processed
> > earlier, the contract is violated by the client.
> > This could be avoided by providing more information to the partition
> > leader. Right now, a new transactional batch signals the start of a new
> > transaction, and there is no way for the partition leader to decide
> whether
> > the batch is an out-of-order message.
> > In a naive and wasteful protocol, we could have a unique transaction id
> > added to each batch and marker, meaning that the leader would be capable
> of
> > refusing batches which arrive after the control marker of the
> transaction.
> > But instead of changing the log format and the protocol, we can achieve
> the
> > same by bumping the producer epoch.
> >
> > Bumping the epoch has a similar effect to "changing the transaction id" -
> > the in-progress transaction will be aborted with a bumped producer epoch,
> > telling the partition leader about the producer epoch change. From this
> > point on, any batches sent with the old epoch will be refused by the
> leader
> > due to the fencing mechanism. It doesn't really matter how many batches
> > will get appended to the log, and how many will be refused - this is an
> > aborted transaction - but the out-of-order message cannot occur, and
> cannot
> > block the LSO infinitely.
> >
> > My suggestion is, that the TransactionManager inside the producer should
> > keep track of what type of errors were encountered by the batches of the
> > transaction, and categorize them along the lines of "definitely
> completed"
> > and "might not be completed". When the transaction goes into an abortable
> > state, and there is at least one batch with "might not be completed", the
> > EndTxnRequest should be skipped, and an epoch bump should be sent.
> > As for what type of error counts as "might not be completed", I can only
> > think of client side timeouts.
> >
> > I believe this is a relatively small change (only affects the client
> lib),
> > but it helps in avoiding some corrupt states in Kafka transactions.
> >
> > Looking forward to your input. If it seems like a sane idea, I go ahead
> and
> > submit a PR for it as well.
> >
> > Thanks in advance,
> > Daniel
> >
>

Reply via email to