jolshan commented on code in PR #17402:
URL: https://github.com/apache/kafka/pull/17402#discussion_r1806753739


##########
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##########
@@ -814,11 +913,31 @@ class TransactionsTest extends IntegrationTestHarness {
     
producer3.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, 
"4", "4", willBeCommitted = true))
     producer3.commitTransaction()
 
-    // Check that the epoch only increased by 1
     producerStateEntry =
       brokers(partitionLeader).logManager.getLog(new TopicPartition(topic1, 
0)).get.producerStateManager.activeProducers.get(producerId)
     assertNotNull(producerStateEntry)
-    assertEquals((initialProducerEpoch + 1).toShort, 
producerStateEntry.producerEpoch)
+
+    // Check that the epoch only increased by 1 when TV2 is disabled.
+    // With TV2 and the latest EndTxnRequest version, the epoch will be bumped 
at the end of every transaction.
+    if (!isTV2Enabled) assertEquals((initialProducerEpoch + 1).toShort, 
producerStateEntry.producerEpoch)
+    else {
+      // Producer State entry contains the last epoch with which records were 
sent.
+      assertEquals((initialProducerEpoch + 2).toShort, 
producerStateEntry.producerEpoch)
+
+      // Access the client's producer epoch via reflection to verify epoch 
bump on the last End Txn Request.
+      val transactionManagerField: Field = classOf[KafkaProducer[_, 
_]].getDeclaredField("transactionManager")

Review Comment:
   Why are we adding this reflection? I think we can just use the 
producerStateEntry.producerEpoch to see it is initialProducerEpoch + 2. (If we 
wanted we could also have a check after the commit to see it was incread by 1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to