showuon commented on code in PR #14375: URL: https://github.com/apache/kafka/pull/14375#discussion_r1426129619
########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -3205,13 +3220,52 @@ class ReplicaManagerTest { threadNamePrefix: Option[String], quotaManager: ReplicationQuotaManager ): ReplicaFetcherManager = { - mockReplicaFetcherManager.getOrElse { - super.createReplicaFetcherManager( - metrics, - time, - threadNamePrefix, - quotaManager - ) + mockReplicaFetcherManager.getOrElse { Review Comment: nit: did we have the indent correct here? ########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -3884,6 +3938,116 @@ class ReplicaManagerTest { } } + @Test + def testSuccessfulBuildRemoteLogAuxStateMetrics(): Unit = { + val tp0 = new TopicPartition(topic, 0) + + val remoteLogManager = mock(classOf[RemoteLogManager]) + val remoteLogSegmentMetadata = mock(classOf[RemoteLogSegmentMetadata]) + when(remoteLogManager.fetchRemoteLogSegmentMetadata(any(), anyInt(), anyLong())).thenReturn( + Optional.of(remoteLogSegmentMetadata) + ) + val storageManager = mock(classOf[RemoteStorageManager]) + when(storageManager.fetchIndex(any(), any())).thenReturn(new ByteArrayInputStream("0".getBytes())) + when(remoteLogManager.storageManager()).thenReturn(storageManager) + + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteLogManager = Some(remoteLogManager), buildRemoteLogAuxState = true) + try { + + val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + val partition0Replicas = Seq[Integer](0, 1).asJava + val topicIds = Map(tp0.topic -> topicId).asJava + val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq( + new LeaderAndIsrPartitionState() + .setTopicName(tp0.topic) + .setPartitionIndex(tp0.partition) + .setControllerEpoch(0) + .setLeader(1) + .setLeaderEpoch(0) + .setIsr(partition0Replicas) + .setPartitionEpoch(0) + .setReplicas(partition0Replicas) + .setIsNew(true) + ).asJava, + topicIds, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + + // Verify the metrics for build remote log state and for failures is zero before replicas start to fetch + assertEquals(0, brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count) + assertEquals(0, brokerTopicStats.topicStats(tp0.topic()).failedBuildRemoteLogAuxStateRate.count) + // Verify aggregate metrics + assertEquals(0, brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count) + assertEquals(0, brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count) + + replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) + + // Replicas fetch from the leader periodically, therefore we check that the metric value is increasing + assertTrue(brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count > 0) + assertEquals(0, brokerTopicStats.topicStats(tp0.topic()).failedBuildRemoteLogAuxStateRate.count) + // Verify aggregate metrics + assertTrue(brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count > 0) + assertEquals(0, brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testFailedBuildRemoteLogAuxStateMetrics(): Unit = { Review Comment: I'm thinking we should add 1 more test case that when `remoteLogManager.fetchRemoteLogSegmentMetadata` throws exception, we should also mark the error rate. So, maybe we use parameter in the test? ex: ``` @Test def testFailedBuildRemoteLogAuxStateMetrics(boolean shouldThrowException): Unit = { if (!shouldThrowException) when(remoteLogManager.fetchRemoteLogSegmentMetadata(any(), anyInt(), anyLong())).thenReturn( Optional.empty()) else { when(remoteLogManager.fetchRemoteLogSegmentMetadata(any(), anyInt(), anyLong())).thenThrow(new Exception...) } WDYT? ``` ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1042,8 +1053,9 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE // If the follower HAS NOT picked up the changes, and they become the leader then they will go through this process // again and delete them with the original deletion reason i.e. size, time or log start offset breach. List<String> undeletedSegments = new ArrayList<>(); + String topicName = log.topicPartition().topic(); Review Comment: nit: I think using `topicIdPartition.topic()` is more straight-forward and consistent, like in L1045, we use `topicIdPartition.topicPartition()` to get partition. ########## core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java: ########## @@ -229,6 +229,9 @@ private Long buildRemoteLogAuxState(TopicPartition topicPartition, Partition partition = replicaMgr.getPartitionOrException(topicPartition); partition.truncateFullyAndStartAt(nextOffset, false, Option.apply(leaderLogStartOffset)); + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark(); + Review Comment: Question: Why do we update request rate metric here? I'm thinking we could mark the request rate and error rate at the caller of `buildRemoteLogAuxState` method. That is, in `ReplicaFetcherTierStateMachine#start` ``` long offsetToFetch = 0; // mark request rate here try { offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset()); } catch (RemoteStorageException e) { // mark error rate here throw e; } ``` This way, if there are exception thrown from other places, like in L219 `fetchRemoteLogSegmentMetadata` method, we can still catch that and update the metrics. WDYT? ########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -3205,13 +3220,52 @@ class ReplicaManagerTest { threadNamePrefix: Option[String], quotaManager: ReplicationQuotaManager ): ReplicaFetcherManager = { - mockReplicaFetcherManager.getOrElse { - super.createReplicaFetcherManager( - metrics, - time, - threadNamePrefix, - quotaManager - ) + mockReplicaFetcherManager.getOrElse { + if (buildRemoteLogAuxState) { + super.createReplicaFetcherManager( + metrics, + time, + threadNamePrefix, + quotaManager + ) + val config = this.config + val metadataCache = this.metadataCache + new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager, () => metadataCache.metadataVersion(), () => 1) { + override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): ReplicaFetcherThread = { + val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("") + val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}" + + val tp = new TopicPartition(topic, 0) + val leader = new MockLeaderEndPoint() { + override def fetch(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = { + Map(tp -> new FetchData().setErrorCode(Errors.OFFSET_MOVED_TO_TIERED_STORAGE.code)) + } + } + leader.setLeaderState(tp, PartitionState(leaderEpoch = 0)) + leader.setReplicaPartitionStateCallback(tp => PartitionState(leaderEpoch = 0)) + + val fetcher = new ReplicaFetcherThread(threadName, leader, config, failedPartitions, replicaManager, + quotaManager, "", () => config.interBrokerProtocolVersion) + + val initialFetchState = InitialFetchState( + topicId = Some(Uuid.randomUuid()), + leader = leader.brokerEndPoint(), + currentLeaderEpoch = 0, + initOffset = 0) + + fetcher.addPartitions(Map(tp -> initialFetchState)) + + fetcher + } + } + } else { + super.createReplicaFetcherManager( + metrics, + time, + threadNamePrefix, + quotaManager + ) Review Comment: We duplicated these lines with L3224. Could we move them before if/else block? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org