Justine Olshan created KAFKA-20090:
--------------------------------------
Summary: TV2 can allow for ongoing transactions with max epoch
that never complete
Key: KAFKA-20090
URL: https://issues.apache.org/jira/browse/KAFKA-20090
Project: Kafka
Issue Type: Task
Reporter: Justine Olshan
When transaction version 2 was introduced, epoch bumps happen on every
transaction.
The original EndTransaction logic considers retries and because of epoch bumps
we wanted to be careful to not fence ourselves. This means that for
EndTransaction retries, we have to check if the epoch has been bumped to
consider a retry.
The original logic returns the current producer ID and epoch in the transaction
metadata when a retry has been identified. The normal end transaction case with
max epoch - 1 was considered and accounted for – the state there is safe to
return to the producer.
However, we didn't consider that in the case of fencing epoch bumps with max
epoch - 1, where we also bump the epoch, but don't create a new producer ID and
epoch. In this scenario the producer was expected to be fenced and call init
producer ID.
There is a scenario we race a timeout and end transaction abort with max epoch
- 1, we can consider the end transaction request a "retry" and return max epoch
as the current producer's epoch instead of fencing.
1. The fencing abort on transactional timeout bumps the epoch to max
2. The EndTxn request with max epoch - 1 is considered a "retry" and we return
max epoch
3. The producer can start a transaction since we don't check epochs on starting
transactions
4. We cannot commit this transaction with TV2 and we cannot timeout the
transaction. It is stuck in Ongoing forever.
I modified
[https://github.com/apache/kafka/blob/aad33e4e41aaa94b06f10a5be0094b717b98900f/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala#L1329]
to capture this behavior. I added the following code to the end:
{code:java}
// Transition to COMPLETE_ABORT since we can't do it via writing markers
response callback
txnMetadata.completeTransitionTo(new
TxnTransitMetadata(txnMetadata.producerId(), txnMetadata.prevProducerId(),
txnMetadata.nextProducerId(), Short.MaxValue, Short.MaxValue -1, txnTimeoutMs,
txnMetadata.pendingState().get(), new util.HashSet[TopicPartition](),
txnMetadata.txnLastUpdateTimestamp(), txnMetadata.txnLastUpdateTimestamp(),
TV_2))
coordinator.handleEndTransaction(transactionalId, producerId,
epochAtMaxBoundary, TransactionResult.ABORT, TV_2, endTxnCallback)
assertEquals(10, newProducerId) assertEquals(Short.MaxValue, newEpoch)
assertEquals(Errors.NONE, error){code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)