[ 
https://issues.apache.org/jira/browse/KAFKA-18401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-18401.
------------------------------------
    Resolution: Fixed

> Transaction version 2 does not support commit transaction without records
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-18401
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18401
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Kuan Po Tseng
>            Assignee: Kuan Po Tseng
>            Priority: Blocker
>             Fix For: 4.0.0
>
>
> This issue was observed when implementing 
> https://issues.apache.org/jira/browse/KAFKA-18206.
> In short, under transaction version 2, it doesn't support commit transaction 
> without sending any records while transaction version 0 & 1 do support this 
> kind of scenario.
> Commit transactions without sending any records is fine when using 
> transaction versions 0 or 1 because the producer won't send EndTxnRequest to 
> the broker [0]. However, with transaction version 2, the producer still sends 
> an EndTxnRequest to the broker while in transaction coordinator, the txn 
> state is still in EMPTY, resulting in an error from the broker.
> This issue can be reproduced with the test in below. I'm unsure if this 
> behavior is expected. If it's not, one potential fix could be to follow the 
> approach used in TV_0 and TV_1, where the EndTxnRequest is not sent if no 
> partitions or offsets have been successfully added to the transaction. If 
> this behavior is expected, we should document it and let user know this 
> change.
> {code:java}
>     @ClusterTests({
>         @ClusterTest(brokers = 3, features = {
>             @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
> 0)}),
>         @ClusterTest(brokers = 3, features = {
>             @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
> 1)}),
>         @ClusterTest(brokers = 3, features = {
>             @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
> 2)})
>     })
>     public void testProducerEndTransaction2(ClusterInstance cluster) throws 
> InterruptedException {
>         Map<String, Object> properties = new HashMap<>();
>         properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar");
>         properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test");
>         properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
>         try (Producer<byte[], byte[]> producer1 = 
> cluster.producer(properties)) {
>             producer1.initTransactions();
>             producer1.beginTransaction();
>             producer1.commitTransaction(); // In TV_2, we'll get 
> InvalidTxnStateException
>         }
>     }
> {code}
> Another test case, which is essentially the same as the previous one, starts 
> with a transaction that includes records, and then proceeds to start the next 
> transaction. When using transaction version 2, we encounter an error, but 
> this time it's a different error from the one seen in the previous case.
> {code:java}
>     @ClusterTests({
>         @ClusterTest(brokers = 3, features = {
>             @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
> 0)}),
>         @ClusterTest(brokers = 3, features = {
>             @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
> 1)}),
>         @ClusterTest(brokers = 3, features = {
>             @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
> 2)})
>     })
>     public void testProducerEndTransaction(ClusterInstance cluster) {
>         Map<String, Object> properties = new HashMap<>();
>         properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar");
>         properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test");
>         properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
>         try (Producer<byte[], byte[]> producer1 = 
> cluster.producer(properties)) {
>             producer1.initTransactions();
>             producer1.beginTransaction();
>             producer1.send(new ProducerRecord<>("test", "key".getBytes(), 
> "value".getBytes()));
>             producer1.commitTransaction();
>             producer1.beginTransaction();
>             producer1.commitTransaction(); // In TV_2, we'll get 
> ProducerFencedException
>         }
>     }
> {code}
>  
> [0]: 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L857-L865]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to