jolshan commented on code in PR #13787: URL: https://github.com/apache/kafka/pull/13787#discussion_r1221911516
########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -2095,82 +2105,143 @@ class ReplicaManagerTest { } @Test - def testVerificationForTransactionalPartitions(): Unit = { - val tp = new TopicPartition(topic, 0) - val transactionalId = "txn1" + def testVerificationForTransactionalPartitionsOnly(): Unit = { + val tp0 = new TopicPartition(topic, 0) + val tp1 = new TopicPartition(topic, 1) val producerId = 24L val producerEpoch = 0.toShort val sequence = 0 - - val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) - val metadataCache = mock(classOf[MetadataCache]) + val node = new Node(0, "host1", 0) val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) - val replicaManager = new ReplicaManager( - metrics = metrics, - config = config, - time = time, - scheduler = new MockScheduler(time), - logManager = mockLogMgr, - quotaManagers = quotaManager, - metadataCache = metadataCache, - logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), - alterPartitionManager = alterPartitionManager, - addPartitionsToTxnManager = Some(addPartitionsToTxnManager)) - + val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0, tp1), node) try { - val becomeLeaderRequest = makeLeaderAndIsrRequest(topicIds(tp.topic), tp, Seq(0, 1), LeaderAndIsr(1, List(0, 1))) - replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) + replicaManager.becomeLeaderOrFollower(1, + makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), + (_, _) => ()) - // We must set up the metadata cache to handle the append and verification. - val metadataResponseTopic = Seq(new MetadataResponseTopic() - .setName(Topic.TRANSACTION_STATE_TOPIC_NAME) - .setPartitions(Seq( - new MetadataResponsePartition() - .setPartitionIndex(0) - .setLeaderId(0)).asJava)) - val node = new Node(0, "host1", 0) + replicaManager.becomeLeaderOrFollower(1, + makeLeaderAndIsrRequest(topicIds(tp1.topic), tp1, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), + (_, _) => ()) - when(metadataCache.contains(tp)).thenReturn(true) - when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), config.interBrokerListenerName)).thenReturn(metadataResponseTopic) - when(metadataCache.getAliveBrokerNode(0, config.interBrokerListenerName)).thenReturn(Some(node)) - when(metadataCache.getAliveBrokerNode(1, config.interBrokerListenerName)).thenReturn(None) - - // We will attempt to schedule to the request handler thread using a non request handler thread. Set this to avoid error. - KafkaRequestHandler.setBypassThreadCheck(true) + // If we supply no transactional ID and idempotent records, we do not verify. + val idempotentRecords = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence, + new SimpleRecord("message".getBytes)) + appendRecords(replicaManager, tp0, idempotentRecords) + verify(addPartitionsToTxnManager, times(0)).addTxnData(any(), any(), any[AddPartitionsToTxnManager.AppendCallback]()) + assertEquals(null, getVerificationGuard(replicaManager, tp0, producerId)) + + // If we supply a transactional ID and some transactional and some idempotent records, we should only verify the topic partition with transactional records. + val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence + 1, + new SimpleRecord("message".getBytes)) + + val transactionToAdd = new AddPartitionsToTxnTransaction() + .setTransactionalId(transactionalId) + .setProducerId(producerId) + .setProducerEpoch(producerEpoch) + .setVerifyOnly(true) + .setTopics(new AddPartitionsToTxnTopicCollection( + Seq(new AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava + )) + + val idempotentRecords2 = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence, + new SimpleRecord("message".getBytes)) + appendRecordsToMultipleTopics(replicaManager, Map(tp0 -> transactionalRecords, tp1 -> idempotentRecords2), transactionalId, Some(0)) + verify(addPartitionsToTxnManager, times(1)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), any[AddPartitionsToTxnManager.AppendCallback]()) + assertNotEquals(null, getVerificationGuard(replicaManager, tp0, producerId)) + assertEquals(null, getVerificationGuard(replicaManager, tp1, producerId)) + } finally { + replicaManager.shutdown() + } + + TestUtils.assertNoNonDaemonThreads(this.getClass.getName) + } + + @Test + def testVerificationFlow(): Unit = { Review Comment: I'm too into transaction world that I forget not everyone is there. 😵💫 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org