tinaselenge commented on code in PR #14375:
URL: https://github.com/apache/kafka/pull/14375#discussion_r1428029215


##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3884,6 +3938,116 @@ class ReplicaManagerTest {
     }
   }
 
+  @Test
+  def testSuccessfulBuildRemoteLogAuxStateMetrics(): Unit = {
+    val tp0 = new TopicPartition(topic, 0)
+
+    val remoteLogManager = mock(classOf[RemoteLogManager])
+    val remoteLogSegmentMetadata = mock(classOf[RemoteLogSegmentMetadata])
+    when(remoteLogManager.fetchRemoteLogSegmentMetadata(any(), anyInt(), 
anyLong())).thenReturn(
+      Optional.of(remoteLogSegmentMetadata)
+    )
+    val storageManager = mock(classOf[RemoteStorageManager])
+    when(storageManager.fetchIndex(any(), any())).thenReturn(new 
ByteArrayInputStream("0".getBytes()))
+    when(remoteLogManager.storageManager()).thenReturn(storageManager)
+
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog = true, remoteLogManager = Some(remoteLogManager), 
buildRemoteLogAuxState = true)
+    try {
+
+      val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+      replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
+      val partition0Replicas = Seq[Integer](0, 1).asJava
+      val topicIds = Map(tp0.topic -> topicId).asJava
+      val leaderAndIsrRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+        Seq(
+          new LeaderAndIsrPartitionState()
+            .setTopicName(tp0.topic)
+            .setPartitionIndex(tp0.partition)
+            .setControllerEpoch(0)
+            .setLeader(1)
+            .setLeaderEpoch(0)
+            .setIsr(partition0Replicas)
+            .setPartitionEpoch(0)
+            .setReplicas(partition0Replicas)
+            .setIsNew(true)
+        ).asJava,
+        topicIds,
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+
+      // Verify the metrics for build remote log state and for failures is 
zero before replicas start to fetch
+      assertEquals(0, 
brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count)
+      assertEquals(0, 
brokerTopicStats.topicStats(tp0.topic()).failedBuildRemoteLogAuxStateRate.count)
+      // Verify aggregate metrics
+      assertEquals(0, 
brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count)
+      assertEquals(0, 
brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count)
+
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => 
())
+
+      // Replicas fetch from the leader periodically, therefore we check that 
the metric value is increasing
+      
assertTrue(brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count
 > 0)
+      assertEquals(0, 
brokerTopicStats.topicStats(tp0.topic()).failedBuildRemoteLogAuxStateRate.count)
+      // Verify aggregate metrics
+      
assertTrue(brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count
 > 0)
+      assertEquals(0, 
brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count)
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @Test
+  def testFailedBuildRemoteLogAuxStateMetrics(): Unit = {

Review Comment:
   `
          when(remoteLogManager.fetchRemoteLogSegmentMetadata(any(), anyInt(), 
anyLong())).thenReturn(
         Optional.empty())
   `
   This is basically what testFailedBuildRemoteLogAuxStateMetrics() already 
does. Because we are not mocking remoteLogSegmentMetadata,  
fetchRemoteLogSegmentMetadata returns an empty result. 
   
   However, I added another test for the exception part. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to