Kuan Po Tseng created KAFKA-18401:
-------------------------------------

             Summary: Transaction version 2 does not support commit transaction 
without records
                 Key: KAFKA-18401
                 URL: https://issues.apache.org/jira/browse/KAFKA-18401
             Project: Kafka
          Issue Type: Bug
            Reporter: Kuan Po Tseng
            Assignee: Kuan Po Tseng


This issue was observed when implementing 
https://issues.apache.org/jira/browse/KAFKA-18206.

In short, under transaction version 2, it doesn't support commit transaction 
without sending any records while transaction version 0 & 1 do support this 
kind of scenario.

Commit transactions without sending any records is fine when using transaction 
versions 0 or 1 because the producer won't send EndTxnRequest to the broker 
[0]. However, with transaction version 2, the producer still sends an 
EndTxnRequest to the broker while in transaction coordinator, the txn state is 
still in EMPTY, resulting in an error from the broker.

This issue can be reproduced with the test in below. I'm unsure if this 
behavior is expected. If it's not, one potential fix could be to follow the 
approach used in TV_0 and TV_1, where the EndTxnRequest is not sent if no 
partitions or offsets have been successfully added to the transaction. If this 
behavior is expected, we should document it and let user know this change.
{code:java}
    @ClusterTests({
        @ClusterTest(brokers = 3, features = {
            @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
0)}),
        @ClusterTest(brokers = 3, features = {
            @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
1)}),
        @ClusterTest(brokers = 3, features = {
            @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
2)})
    })
    public void testProducerEndTransaction2(ClusterInstance cluster) throws 
InterruptedException {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar");
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test");
        properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        try (Producer<byte[], byte[]> producer1 = cluster.producer(properties)) 
{

            producer1.initTransactions();
            producer1.beginTransaction();
            producer1.commitTransaction(); // In TV_2, we'll get 
InvalidTxnStateException
        }
    }
{code}
Another test case, which is essentially the same as the previous one, starts 
with a transaction that includes records, and then proceeds to start the next 
transaction. When using transaction version 2, we encounter an error, but this 
time it's a different error from the one seen in the previous case.
{code:java}
    @ClusterTests({
        @ClusterTest(brokers = 3, features = {
            @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
0)}),
        @ClusterTest(brokers = 3, features = {
            @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
1)}),
        @ClusterTest(brokers = 3, features = {
            @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
2)})
    })
    public void testProducerEndTransaction(ClusterInstance cluster) {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar");
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test");
        properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        try (Producer<byte[], byte[]> producer1 = cluster.producer(properties)) 
{

            producer1.initTransactions();
            producer1.beginTransaction();
            producer1.send(new ProducerRecord<>("test", "key".getBytes(), 
"value".getBytes()));
            producer1.commitTransaction();

            producer1.beginTransaction();
            producer1.commitTransaction(); // In TV_2, we'll get 
ProducerFencedException
        }
    }
{code}
 

[0]: 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L857-L865]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to