Kyle Kingsbury created KAFKA-17754:
--------------------------------------

             Summary: A delayed EndTxn message can cause aborted read, lost 
writes, atomicity violation
                 Key: KAFKA-17754
                 URL: https://issues.apache.org/jira/browse/KAFKA-17754
             Project: Kafka
          Issue Type: Bug
          Components: clients, producer , protocol
    Affects Versions: 3.8.0
            Reporter: Kyle Kingsbury
         Attachments: g1a-trace.svg

In short: I believe that both an internal retry mechanism and guidance from the 
example code and client API docs are independently capable of causing committed 
transactions to actually abort, aborted transactions to actually commit, and 
transactions to be split into multiple parts with different fates. A delayed 
EndTxn message could arrive seconds later and decide the fate of an unrelated 
transaction.

Consider the attached Lamport diagram, reconstructed from node logs and packet 
captures in a recent Jepsen test. In it, a single process, using a single 
producer and consumer, executes a series of transactions which all commit or 
abort cleanly.Process 76 selected the unique transactional ID `jt1234` on 
initialization.

>From packet captures and debug logs, we see `jt1234` used producer ID `233`, 
>submitted all four operations, then sent an EndTxn message with `committed = 
>false`, which denotes a transaction abort. However, fifteen separate calls to 
>`poll` observed this transaction's write of `424` to key `5`---an obvious case 
>of aborted read (G1a). Even stranger, *no* poller observed the other writes 
>from this transaction: key `17` apparently never
received values `926` or `927`. Why?

Close inspection of the packet capture, combined with Bufstream's logs, allowed 
us to reconstruct what happened. Process 76 began a transaction which sent 
`1018` to key `15`. It sent an `EndTxn` message to commit that transaction to 
node `n3`. However, it did not receive a prompt response. The client then 
quietly sent a *second* commit message to `n4`, which returned successfully; 
the test harness's call to `commitTransaction` completed successfully. The 
process then performed and intentionally aborted a second transaction; this
completed OK. So far, so good.

Then process 76 began our problematic transaction. It sent `424` to key `5`, 
and added new partitions to the transaction. Just after accepting record `424`, 
node `n3` received the delayed commit message from two transactions previously. 
It committed the current transaction, effectively chopping it in half. The 
first half (record `424`) was committed and visible to pollers. The second 
half, sending `926` and `927` to key `17`, implicitly began a second 
transaction, which was aborted by the client.

This suggests a fundamental problem in the Kafka transaction protocol. The 
protocol is [intentionally designed](https://kafka.apache.org/protocol) to 
allow clients to submit requests over multiple TCP connections and to 
distribute them across multiple nodes. There is no sequence number to order 
requests from the same client. There is no concept of a transaction number. 
When a server receives a commit (or abort) message, it has no way to know what 
transaction the client intended to commit. It simply commits or aborts whatever
transaction happens to be in progress.

This means transactions which appeared to commit could actually abort, and vice 
versa: we observed both aborted reads and lost writes. It also means 
transactions could get chopped in to smaller pieces: one could lose some, but 
not all, of a transaction's effects.

What does it take to get this behavior? First, an `EndTxn` message must be 
delayed---for instance due to network latency, packet loss, a slow computer, 
garbage collection, etc. Second, while that `EndTxn` arrow is hovering in the 
air, the client needs to move on to perform a second transaction using the same 
producer ID and epoch. There are several ways this could happen.

First, users could explicitly retry committing or aborting a transaction. The 
docs say they can, and the client won't stop them.

Second, the official Kafka Java client docs instruct users repeatedly instruct 
users to call `abortTransaction` if an error occurs during 
`commitTransaction`.[^abort-exceptions] The provided example code leads 
directly to this behavior: if `commitTransaction` times out, it calls 
`abortTransaction`, and violĂ : the client can move on to later operations. The 
only exceptions in the docs are `ProducerFencedException`, 
`OutOfOrderSequenceException`, and `AuthorizationException`, none of which 
apply here.

I've tried to avoid this problem by ensuring that transactions either commit 
once, or abort once, never both. Sadly, this doesn't work. Indeed, process 76 
in this test run *never* tried to abort a transaction after calling 
commit---and even though it only calls `commitTransaction` once, it sent *two* 
commit messages to two different nodes. I suspect this is because the Java 
client treats timeouts as retriable 
(https://github.com/apache/kafka/blob/8125c3da5bb6ebb35a0cb3494624d33fad4e3187/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java#L22),
 and the transaction manager appears to perform its own internal retries.

I'm not entirely sure how to fix this--the protocol feels like it's missing 
critical ordering information, and you can't rely on e.g. TCP ordering, because 
it's multi-stream. One option might be to force the producer to acquire a new 
epoch if it ever encounters an indefinite result from an EndTxn message--then 
the producer fencing mechanism would prevent any delayed EndTxn messages from 
being processed, right?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to