dajac commented on code in PR #11965: URL: https://github.com/apache/kafka/pull/11965#discussion_r841436792
########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -256,6 +256,7 @@ class Partition(val topicPartition: TopicPartition, // start offset for 'leaderEpoch' above (leader epoch of the current leader for this partition), // defined when this broker is leader for partition @volatile private var leaderEpochStartOffsetOpt: Option[Long] = None + // Point to itself if is leader, otherwise point to the latest leader replica Review Comment: nit: I would phrase it as follow: `Replica ID of the leader, defined when this broker is leader or follower for the partition.`. ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -1234,8 +1234,9 @@ class ReplicaManager(val config: KafkaConfig, fetchOffset: Long, currentTimeMs: Long): Option[Int] = { partition.leaderReplicaIdOpt.flatMap { leaderReplicaId => - // Don't look up preferred for follower fetches via normal replication - if (Request.isValidBrokerId(replicaId)) + // Don't look up preferred for follower fetches via normal replication and + // don't look up preferred read replica while fetch from follower replica + if (Request.isValidBrokerId(replicaId) || !partition.isLeader) Review Comment: I wonder if we should add the following method to `Partition` and use it instead of using `leaderReplicaIdOpt`. ```scala def leaderIdIfLocal: Option[Int] = { leaderReplicaIdOpt.filter(_ == localBrokerId) } ``` We don't use any lock on this path so solely relying on `leaderReplicaIdOpt` to take the decision seems slightly better than calling `partition.isLeader`. What do you think? ########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -3719,3 +3767,19 @@ class ReplicaManagerTest { } } } + +class MockReplicaSelector extends ReplicaSelector { + + private val triggerSelectionCount = new AtomicLong() + + def getTriggerSelectionCount: Long = triggerSelectionCount.get + + /** + * Select the preferred replica a client should use for fetching. If no replica is available, this will return an + * empty optional. + */ Review Comment: nit: I think that we can remove this default comment. ########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -1300,6 +1300,54 @@ class ReplicaManagerTest { TestUtils.assertNoNonDaemonThreads(this.getClass.getName) } + @Test + def testFetchFromFollowerShouldNotRunPreferLeaderSelect(): Unit = { + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), + propsModifier = props => props.put(KafkaConfig.ReplicaSelectorClassProp, "kafka.server.MockReplicaSelector")) Review Comment: nit: We could use `classOf[MockReplicaSelector].getName`, I think. ########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -3719,3 +3767,19 @@ class ReplicaManagerTest { } } } + +class MockReplicaSelector extends ReplicaSelector { + + private val triggerSelectionCount = new AtomicLong() + + def getTriggerSelectionCount: Long = triggerSelectionCount.get Review Comment: nit: The `trigger` in the name bugs me. We could perhaps just use `selectionCount`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org