jolshan commented on code in PR #17402: URL: https://github.com/apache/kafka/pull/17402#discussion_r1806753739
########## core/src/test/scala/integration/kafka/api/TransactionsTest.scala: ########## @@ -814,11 +913,31 @@ class TransactionsTest extends IntegrationTestHarness { producer3.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "4", "4", willBeCommitted = true)) producer3.commitTransaction() - // Check that the epoch only increased by 1 producerStateEntry = brokers(partitionLeader).logManager.getLog(new TopicPartition(topic1, 0)).get.producerStateManager.activeProducers.get(producerId) assertNotNull(producerStateEntry) - assertEquals((initialProducerEpoch + 1).toShort, producerStateEntry.producerEpoch) + + // Check that the epoch only increased by 1 when TV2 is disabled. + // With TV2 and the latest EndTxnRequest version, the epoch will be bumped at the end of every transaction. + if (!isTV2Enabled) assertEquals((initialProducerEpoch + 1).toShort, producerStateEntry.producerEpoch) + else { + // Producer State entry contains the last epoch with which records were sent. + assertEquals((initialProducerEpoch + 2).toShort, producerStateEntry.producerEpoch) + + // Access the client's producer epoch via reflection to verify epoch bump on the last End Txn Request. + val transactionManagerField: Field = classOf[KafkaProducer[_, _]].getDeclaredField("transactionManager") Review Comment: Why are we adding this reflection? I think we can just use the producerStateEntry.producerEpoch to see it is initialProducerEpoch + 2. (If we wanted we could also have a check after the commit to see it was incread by 1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org