[ https://issues.apache.org/jira/browse/KAFKA-18401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Justine Olshan resolved KAFKA-18401. ------------------------------------ Resolution: Fixed > Transaction version 2 does not support commit transaction without records > ------------------------------------------------------------------------- > > Key: KAFKA-18401 > URL: https://issues.apache.org/jira/browse/KAFKA-18401 > Project: Kafka > Issue Type: Bug > Reporter: Kuan Po Tseng > Assignee: Kuan Po Tseng > Priority: Blocker > Fix For: 4.0.0 > > > This issue was observed when implementing > https://issues.apache.org/jira/browse/KAFKA-18206. > In short, under transaction version 2, it doesn't support commit transaction > without sending any records while transaction version 0 & 1 do support this > kind of scenario. > Commit transactions without sending any records is fine when using > transaction versions 0 or 1 because the producer won't send EndTxnRequest to > the broker [0]. However, with transaction version 2, the producer still sends > an EndTxnRequest to the broker while in transaction coordinator, the txn > state is still in EMPTY, resulting in an error from the broker. > This issue can be reproduced with the test in below. I'm unsure if this > behavior is expected. If it's not, one potential fix could be to follow the > approach used in TV_0 and TV_1, where the EndTxnRequest is not sent if no > partitions or offsets have been successfully added to the transaction. If > this behavior is expected, we should document it and let user know this > change. > {code:java} > @ClusterTests({ > @ClusterTest(brokers = 3, features = { > @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = > 0)}), > @ClusterTest(brokers = 3, features = { > @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = > 1)}), > @ClusterTest(brokers = 3, features = { > @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = > 2)}) > }) > public void testProducerEndTransaction2(ClusterInstance cluster) throws > InterruptedException { > Map<String, Object> properties = new HashMap<>(); > properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar"); > properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test"); > properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); > try (Producer<byte[], byte[]> producer1 = > cluster.producer(properties)) { > producer1.initTransactions(); > producer1.beginTransaction(); > producer1.commitTransaction(); // In TV_2, we'll get > InvalidTxnStateException > } > } > {code} > Another test case, which is essentially the same as the previous one, starts > with a transaction that includes records, and then proceeds to start the next > transaction. When using transaction version 2, we encounter an error, but > this time it's a different error from the one seen in the previous case. > {code:java} > @ClusterTests({ > @ClusterTest(brokers = 3, features = { > @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = > 0)}), > @ClusterTest(brokers = 3, features = { > @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = > 1)}), > @ClusterTest(brokers = 3, features = { > @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = > 2)}) > }) > public void testProducerEndTransaction(ClusterInstance cluster) { > Map<String, Object> properties = new HashMap<>(); > properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar"); > properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test"); > properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); > try (Producer<byte[], byte[]> producer1 = > cluster.producer(properties)) { > producer1.initTransactions(); > producer1.beginTransaction(); > producer1.send(new ProducerRecord<>("test", "key".getBytes(), > "value".getBytes())); > producer1.commitTransaction(); > producer1.beginTransaction(); > producer1.commitTransaction(); // In TV_2, we'll get > ProducerFencedException > } > } > {code} > > [0]: > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L857-L865] -- This message was sent by Atlassian Jira (v8.20.10#820010)