showuon commented on code in PR #14375:
URL: https://github.com/apache/kafka/pull/14375#discussion_r1426129619


##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3205,13 +3220,52 @@ class ReplicaManagerTest {
         threadNamePrefix: Option[String],
         quotaManager: ReplicationQuotaManager
       ): ReplicaFetcherManager = {
-        mockReplicaFetcherManager.getOrElse {
-          super.createReplicaFetcherManager(
-            metrics,
-            time,
-            threadNamePrefix,
-            quotaManager
-          )
+          mockReplicaFetcherManager.getOrElse {

Review Comment:
   nit: did we have the indent correct here?



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3884,6 +3938,116 @@ class ReplicaManagerTest {
     }
   }
 
+  @Test
+  def testSuccessfulBuildRemoteLogAuxStateMetrics(): Unit = {
+    val tp0 = new TopicPartition(topic, 0)
+
+    val remoteLogManager = mock(classOf[RemoteLogManager])
+    val remoteLogSegmentMetadata = mock(classOf[RemoteLogSegmentMetadata])
+    when(remoteLogManager.fetchRemoteLogSegmentMetadata(any(), anyInt(), 
anyLong())).thenReturn(
+      Optional.of(remoteLogSegmentMetadata)
+    )
+    val storageManager = mock(classOf[RemoteStorageManager])
+    when(storageManager.fetchIndex(any(), any())).thenReturn(new 
ByteArrayInputStream("0".getBytes()))
+    when(remoteLogManager.storageManager()).thenReturn(storageManager)
+
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog = true, remoteLogManager = Some(remoteLogManager), 
buildRemoteLogAuxState = true)
+    try {
+
+      val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+      replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
+      val partition0Replicas = Seq[Integer](0, 1).asJava
+      val topicIds = Map(tp0.topic -> topicId).asJava
+      val leaderAndIsrRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+        Seq(
+          new LeaderAndIsrPartitionState()
+            .setTopicName(tp0.topic)
+            .setPartitionIndex(tp0.partition)
+            .setControllerEpoch(0)
+            .setLeader(1)
+            .setLeaderEpoch(0)
+            .setIsr(partition0Replicas)
+            .setPartitionEpoch(0)
+            .setReplicas(partition0Replicas)
+            .setIsNew(true)
+        ).asJava,
+        topicIds,
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+
+      // Verify the metrics for build remote log state and for failures is 
zero before replicas start to fetch
+      assertEquals(0, 
brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count)
+      assertEquals(0, 
brokerTopicStats.topicStats(tp0.topic()).failedBuildRemoteLogAuxStateRate.count)
+      // Verify aggregate metrics
+      assertEquals(0, 
brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count)
+      assertEquals(0, 
brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count)
+
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => 
())
+
+      // Replicas fetch from the leader periodically, therefore we check that 
the metric value is increasing
+      
assertTrue(brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count
 > 0)
+      assertEquals(0, 
brokerTopicStats.topicStats(tp0.topic()).failedBuildRemoteLogAuxStateRate.count)
+      // Verify aggregate metrics
+      
assertTrue(brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count
 > 0)
+      assertEquals(0, 
brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count)
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @Test
+  def testFailedBuildRemoteLogAuxStateMetrics(): Unit = {

Review Comment:
   I'm thinking we should add 1 more test case that when 
`remoteLogManager.fetchRemoteLogSegmentMetadata` throws exception, we should 
also mark the error rate. So, maybe we use parameter in the test? ex: 
   
   ```
   @Test
     def testFailedBuildRemoteLogAuxStateMetrics(boolean shouldThrowException): 
Unit = {
       if (!shouldThrowException) 
          when(remoteLogManager.fetchRemoteLogSegmentMetadata(any(), anyInt(), 
anyLong())).thenReturn(
         Optional.empty())
       else {
         when(remoteLogManager.fetchRemoteLogSegmentMetadata(any(), anyInt(), 
anyLong())).thenThrow(new Exception...)
      }
   
   WDYT?
   
   ```



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1042,8 +1053,9 @@ void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionE
             // If the follower HAS NOT picked up the changes, and they become 
the leader then they will go through this process
             // again and delete them with the original deletion reason i.e. 
size, time or log start offset breach.
             List<String> undeletedSegments = new ArrayList<>();
+            String topicName = log.topicPartition().topic();

Review Comment:
   nit: I think using `topicIdPartition.topic()` is more straight-forward and 
consistent, like in L1045, we use `topicIdPartition.topicPartition()` to get 
partition.



##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -229,6 +229,9 @@ private Long buildRemoteLogAuxState(TopicPartition 
topicPartition,
                 Partition partition = 
replicaMgr.getPartitionOrException(topicPartition);
                 partition.truncateFullyAndStartAt(nextOffset, false, 
Option.apply(leaderLogStartOffset));
 
+                
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark();
+                
replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark();
+

Review Comment:
   Question: Why do we update request rate metric here? 
   I'm thinking we could mark the request rate and error rate at the caller of 
`buildRemoteLogAuxState` method. That is, in 
`ReplicaFetcherTierStateMachine#start`
   
   ```
   long offsetToFetch = 0;
   // mark request rate here
   try {
     offsetToFetch = buildRemoteLogAuxState(topicPartition, 
currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, 
fetchPartitionData.logStartOffset());
   } catch (RemoteStorageException e) {
     // mark error rate here
     throw e;
   }
   ```
   
   This way, if there are exception thrown from other places, like in L219 
`fetchRemoteLogSegmentMetadata` method, we can still catch that and update the 
metrics. WDYT? 
   
   



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3205,13 +3220,52 @@ class ReplicaManagerTest {
         threadNamePrefix: Option[String],
         quotaManager: ReplicationQuotaManager
       ): ReplicaFetcherManager = {
-        mockReplicaFetcherManager.getOrElse {
-          super.createReplicaFetcherManager(
-            metrics,
-            time,
-            threadNamePrefix,
-            quotaManager
-          )
+          mockReplicaFetcherManager.getOrElse {
+            if (buildRemoteLogAuxState) {
+              super.createReplicaFetcherManager(
+                metrics,
+                time,
+                threadNamePrefix,
+                quotaManager
+              )
+              val config = this.config
+              val metadataCache = this.metadataCache
+              new ReplicaFetcherManager(config, this, metrics, time, 
threadNamePrefix, quotaManager, () => metadataCache.metadataVersion(), () => 1) 
{
+                override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): ReplicaFetcherThread = {
+                  val prefix = threadNamePrefix.map(tp => 
s"$tp:").getOrElse("")
+                  val threadName = 
s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
+
+                  val tp = new TopicPartition(topic, 0)
+                  val leader = new MockLeaderEndPoint() {
+                    override def fetch(fetchRequest: FetchRequest.Builder): 
Map[TopicPartition, FetchData] = {
+                      Map(tp -> new 
FetchData().setErrorCode(Errors.OFFSET_MOVED_TO_TIERED_STORAGE.code))
+                    }
+                  }
+                  leader.setLeaderState(tp, PartitionState(leaderEpoch = 0))
+                  leader.setReplicaPartitionStateCallback(tp => 
PartitionState(leaderEpoch = 0))
+
+                  val fetcher = new ReplicaFetcherThread(threadName, leader, 
config, failedPartitions, replicaManager,
+                    quotaManager, "", () => config.interBrokerProtocolVersion)
+
+                  val initialFetchState = InitialFetchState(
+                    topicId = Some(Uuid.randomUuid()),
+                    leader = leader.brokerEndPoint(),
+                    currentLeaderEpoch = 0,
+                    initOffset = 0)
+
+                  fetcher.addPartitions(Map(tp -> initialFetchState))
+
+                  fetcher
+                }
+              }
+            } else {
+              super.createReplicaFetcherManager(
+                metrics,
+                time,
+                threadNamePrefix,
+                quotaManager
+              )

Review Comment:
   We duplicated these lines with L3224. Could we move them before if/else 
block?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to