jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1221911516

##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -2095,82 +2105,143 @@ class ReplicaManagerTest {
   }
 
   @Test
-  def testVerificationForTransactionalPartitions(): Unit = {
-    val tp = new TopicPartition(topic, 0)
-    val transactionalId = "txn1"
+  def testVerificationForTransactionalPartitionsOnly(): Unit = {
+    val tp0 = new TopicPartition(topic, 0)
+    val tp1 = new TopicPartition(topic, 1)
     val producerId = 24L
     val producerEpoch = 0.toShort
     val sequence = 0
-    
-    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)))
-    val metadataCache = mock(classOf[MetadataCache])
+    val node = new Node(0, "host1", 0)
     val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
 
-    val replicaManager = new ReplicaManager(
-      metrics = metrics,
-      config = config,
-      time = time,
-      scheduler = new MockScheduler(time),
-      logManager = mockLogMgr,
-      quotaManagers = quotaManager,
-      metadataCache = metadataCache,
-      logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
-      alterPartitionManager = alterPartitionManager,
-      addPartitionsToTxnManager = Some(addPartitionsToTxnManager))
-
+    val replicaManager = 
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
 List(tp0, tp1), node)
     try {
-      val becomeLeaderRequest = makeLeaderAndIsrRequest(topicIds(tp.topic), 
tp, Seq(0, 1), LeaderAndIsr(1,  List(0, 1)))
-      replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => 
())
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
 
-      // We must set up the metadata cache to handle the append and 
verification.
-      val metadataResponseTopic = Seq(new MetadataResponseTopic()
-        .setName(Topic.TRANSACTION_STATE_TOPIC_NAME)
-        .setPartitions(Seq(
-          new MetadataResponsePartition()
-            .setPartitionIndex(0)
-            .setLeaderId(0)).asJava))
-      val node = new Node(0, "host1", 0)
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp1.topic), tp1, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
 
-      when(metadataCache.contains(tp)).thenReturn(true)
-      
when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), 
config.interBrokerListenerName)).thenReturn(metadataResponseTopic)
-      when(metadataCache.getAliveBrokerNode(0, 
config.interBrokerListenerName)).thenReturn(Some(node))
-      when(metadataCache.getAliveBrokerNode(1, 
config.interBrokerListenerName)).thenReturn(None)
-      
-      // We will attempt to schedule to the request handler thread using a non 
request handler thread. Set this to avoid error.
-      KafkaRequestHandler.setBypassThreadCheck(true)
+      // If we supply no transactional ID and idempotent records, we do not 
verify.
+      val idempotentRecords = 
MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+      appendRecords(replicaManager, tp0, idempotentRecords)
+      verify(addPartitionsToTxnManager, times(0)).addTxnData(any(), any(), 
any[AddPartitionsToTxnManager.AppendCallback]())
+      assertEquals(null, getVerificationGuard(replicaManager, tp0, producerId))
+
+      // If we supply a transactional ID and some transactional and some 
idempotent records, we should only verify the topic partition with 
transactional records.
+      val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence + 1,
+        new SimpleRecord("message".getBytes))
+
+      val transactionToAdd = new AddPartitionsToTxnTransaction()
+        .setTransactionalId(transactionalId)
+        .setProducerId(producerId)
+        .setProducerEpoch(producerEpoch)
+        .setVerifyOnly(true)
+        .setTopics(new AddPartitionsToTxnTopicCollection(
+          Seq(new 
AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+        ))
+
+      val idempotentRecords2 = 
MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+      appendRecordsToMultipleTopics(replicaManager, Map(tp0 -> 
transactionalRecords, tp1 -> idempotentRecords2), transactionalId, Some(0))
+      verify(addPartitionsToTxnManager, 
times(1)).addTxnData(ArgumentMatchers.eq(node), 
ArgumentMatchers.eq(transactionToAdd), 
any[AddPartitionsToTxnManager.AppendCallback]())
+      assertNotEquals(null, getVerificationGuard(replicaManager, tp0, 
producerId))
+      assertEquals(null, getVerificationGuard(replicaManager, tp1, producerId))
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
+  @Test
+  def testVerificationFlow(): Unit = {

Review Comment:
   I'm too into transaction world that I forget not everyone is there. 😵‍💫



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to