dajac commented on code in PR #12877:
URL: https://github.com/apache/kafka/pull/12877#discussion_r1028216090


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1244,8 +1244,12 @@ class ReplicaManager(val config: KafkaConfig,
 
           partition.remoteReplicas.foreach { replica =>
             val replicaState = replica.stateSnapshot
-            // Exclude replicas that don't have the requested offset (whether 
or not if they're in the ISR)
-            if (replicaState.logEndOffset >= fetchOffset && 
replicaState.logStartOffset <= fetchOffset) {
+            // Exclude replicas that don't have the requested offset. Also 
exclude replicas that are not
+            // in the ISR as the follower may lag behind indefinitely.
+            if (replicaState.logEndOffset >= fetchOffset &&
+              replicaState.logStartOffset <= fetchOffset &&
+              partition.inSyncReplicaIds.contains(replica.brokerId)

Review Comment:
   nit: Could we align these two lines with `replicaState` on the previous line?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1244,8 +1244,12 @@ class ReplicaManager(val config: KafkaConfig,
 
           partition.remoteReplicas.foreach { replica =>
             val replicaState = replica.stateSnapshot
-            // Exclude replicas that don't have the requested offset (whether 
or not if they're in the ISR)
-            if (replicaState.logEndOffset >= fetchOffset && 
replicaState.logStartOffset <= fetchOffset) {
+            // Exclude replicas that don't have the requested offset. Also 
exclude replicas that are not
+            // in the ISR as the follower may lag behind indefinitely.
+            if (replicaState.logEndOffset >= fetchOffset &&
+              replicaState.logStartOffset <= fetchOffset &&
+              partition.inSyncReplicaIds.contains(replica.brokerId)
+            ) {

Review Comment:
   nit: We usually put the closing parenthesis of `if` statement on the 
previous line.



##########
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##########
@@ -84,9 +84,58 @@ class FetchFromFollowerIntegrationTest extends 
BaseFetchRequestTest {
       TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 1)
       val response = receive[FetchResponse](socket, ApiKeys.FETCH, version)
       assertEquals(Errors.NONE, response.error)
-      assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts())
+      assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
     } finally {
       socket.close()
     }
   }
+
+  @Test
+  def testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable(): Unit = {
+    // Create a topic with 2 replicas where broker 0 is the leader and 1 is 
the follower.
+    val admin = createAdminClient()
+    TestUtils.createTopicWithAdmin(
+      admin,
+      topic,
+      brokers,
+      replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
+    )
+
+    TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)
+
+    val topicPartition = new TopicPartition(topic, 0)
+    val offsetMap = Map(topicPartition -> 10L)
+
+    val request = createConsumerFetchRequest(
+      maxResponseBytes = 1000,
+      maxPartitionBytes = 1000,
+      Seq(topicPartition),
+      offsetMap,
+      ApiKeys.FETCH.latestVersion,
+      maxWaitMs = 20000,
+      minBytes = 1,
+      rackId = followerBrokerId.toString
+    )
+    var response = connectAndReceive[FetchResponse](request, 
brokers(leaderBrokerId).socketServer)
+    assertEquals(Errors.NONE, response.error)
+    assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
+    validateFetchResponse(response, preferredReadReplica = 1)
+
+    // Shutdown follower broker. Consumer will reach out to leader after 
metadata.max.age.ms
+    brokers(followerBrokerId).shutdown()
+    response = connectAndReceive[FetchResponse](request, 
brokers(leaderBrokerId).socketServer)
+    assertEquals(Errors.NONE, response.error)
+    assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
+    validateFetchResponse(response)
+  }
+
+  private def validateFetchResponse(response: FetchResponse, 
preferredReadReplica: Int = -1): Unit = {

Review Comment:
   nit: `validatePreferredReadReplica`?



##########
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##########
@@ -84,9 +84,58 @@ class FetchFromFollowerIntegrationTest extends 
BaseFetchRequestTest {
       TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 1)
       val response = receive[FetchResponse](socket, ApiKeys.FETCH, version)
       assertEquals(Errors.NONE, response.error)
-      assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts())
+      assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
     } finally {
       socket.close()
     }
   }
+
+  @Test
+  def testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable(): Unit = {
+    // Create a topic with 2 replicas where broker 0 is the leader and 1 is 
the follower.
+    val admin = createAdminClient()
+    TestUtils.createTopicWithAdmin(
+      admin,
+      topic,
+      brokers,
+      replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
+    )
+
+    TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)
+
+    val topicPartition = new TopicPartition(topic, 0)
+    val offsetMap = Map(topicPartition -> 10L)
+
+    val request = createConsumerFetchRequest(
+      maxResponseBytes = 1000,
+      maxPartitionBytes = 1000,
+      Seq(topicPartition),
+      offsetMap,
+      ApiKeys.FETCH.latestVersion,
+      maxWaitMs = 20000,
+      minBytes = 1,
+      rackId = followerBrokerId.toString
+    )
+    var response = connectAndReceive[FetchResponse](request, 
brokers(leaderBrokerId).socketServer)
+    assertEquals(Errors.NONE, response.error)
+    assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
+    validateFetchResponse(response, preferredReadReplica = 1)
+
+    // Shutdown follower broker. Consumer will reach out to leader after 
metadata.max.age.ms
+    brokers(followerBrokerId).shutdown()

Review Comment:
   Is it guaranteed that the follower is removed from the ISR after this line 
or do we have a race condition with the below checks? 



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -4174,11 +4244,14 @@ class ReplicaManagerTest {
 class MockReplicaSelector extends ReplicaSelector {
 
   private val selectionCount = new AtomicLong()
+  private var partitionViewArgument: Option[PartitionView] = None
 
   def getSelectionCount: Long = selectionCount.get
+  def getPartitionViewArgument(): Option[PartitionView] = partitionViewArgument

Review Comment:
   nit: We usually don't prefix getters with `get`. Could we rename these?



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -1279,6 +1280,75 @@ class ReplicaManagerTest {
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testPreferredReplicaAsLeaderWhenSameRackFollowerIsOutOfIsr(): Unit = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
+      propsModifier = props => props.put(KafkaConfig.ReplicaSelectorClassProp, 
classOf[MockReplicaSelector].getName))
+
+    try {
+      val leaderBrokerId = 0
+      val followerBrokerId = 1
+      val leaderNode = new Node(leaderBrokerId, "host1", 0, "rack-a")
+      val followerNode = new Node(followerBrokerId, "host2", 1, "rack-b")
+      val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava
+      val topicId = Uuid.randomUuid()
+      val tp0 = new TopicPartition(topic, 0)
+      val tidp0 = new TopicIdPartition(topicId, tp0)
+
+      when(replicaManager.metadataCache.getPartitionReplicaEndpoints(
+        tp0,
+        new ListenerName("default")
+      )).thenReturn(Map(
+        leaderBrokerId -> leaderNode,
+        followerBrokerId -> followerNode
+      ).toMap)
+
+      // Make this replica the leader and remove follower from ISR.
+      val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
+        ApiKeys.LEADER_AND_ISR.latestVersion,
+        0,
+        0,
+        brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(leaderBrokerId)
+          .setLeaderEpoch(1)
+          .setIsr(Seq[Integer](leaderBrokerId).asJava)
+          .setPartitionEpoch(0)
+          .setReplicas(brokerList)
+          .setIsNew(false)).asJava,
+        Collections.singletonMap(topic, topicId),
+        Set(leaderNode, followerNode).asJava).build()
+
+      replicaManager.becomeLeaderOrFollower(2, leaderAndIsrRequest, (_, _) => 
())
+
+      val metadata = new DefaultClientMetadata("rack-b", "client-id",
+        InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, 
"default")
+
+      val consumerResult = fetchPartitionAsConsumer(
+        replicaManager,
+        tidp0,
+        new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()),
+        clientMetadata = Some(metadata)
+      )
+
+      // Fetch from leader succeeds
+      assertTrue(consumerResult.hasFired)
+
+      // PartitionView passed to ReplicaSelector should not contain the 
follower as it's not in the ISR
+      val expectedReplicaViews = Set(new DefaultReplicaView(leaderNode, 0, 0))
+      val partitionView = replicaManager.replicaSelectorOpt.get
+        .asInstanceOf[MockReplicaSelector].getPartitionViewArgument()
+
+      assertTrue(partitionView.isDefined)
+      assertEquals(expectedReplicaViews.asJava, partitionView.get.replicas())

Review Comment:
   nit: You can remove `()` after `replicas`.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1244,8 +1244,12 @@ class ReplicaManager(val config: KafkaConfig,
 
           partition.remoteReplicas.foreach { replica =>
             val replicaState = replica.stateSnapshot
-            // Exclude replicas that don't have the requested offset (whether 
or not if they're in the ISR)
-            if (replicaState.logEndOffset >= fetchOffset && 
replicaState.logStartOffset <= fetchOffset) {
+            // Exclude replicas that don't have the requested offset. Also 
exclude replicas that are not
+            // in the ISR as the follower may lag behind indefinitely.
+            if (replicaState.logEndOffset >= fetchOffset &&

Review Comment:
   Note that this is implicit if the replica is in the ISR but it does not cost 
much. Should we put the ISR check first? It seems to me that this is the most 
important one.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1244,8 +1244,12 @@ class ReplicaManager(val config: KafkaConfig,
 
           partition.remoteReplicas.foreach { replica =>
             val replicaState = replica.stateSnapshot
-            // Exclude replicas that don't have the requested offset (whether 
or not if they're in the ISR)
-            if (replicaState.logEndOffset >= fetchOffset && 
replicaState.logStartOffset <= fetchOffset) {
+            // Exclude replicas that don't have the requested offset. Also 
exclude replicas that are not
+            // in the ISR as the follower may lag behind indefinitely.

Review Comment:
   nit: Should we explain the impact if a follower lagging indefinitely on 
consumers fetching from it?



##########
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##########
@@ -84,9 +84,58 @@ class FetchFromFollowerIntegrationTest extends 
BaseFetchRequestTest {
       TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 1)
       val response = receive[FetchResponse](socket, ApiKeys.FETCH, version)
       assertEquals(Errors.NONE, response.error)
-      assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts())
+      assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
     } finally {
       socket.close()
     }
   }
+
+  @Test
+  def testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable(): Unit = {
+    // Create a topic with 2 replicas where broker 0 is the leader and 1 is 
the follower.
+    val admin = createAdminClient()
+    TestUtils.createTopicWithAdmin(
+      admin,
+      topic,
+      brokers,
+      replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
+    )
+
+    TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)
+
+    val topicPartition = new TopicPartition(topic, 0)
+    val offsetMap = Map(topicPartition -> 10L)
+
+    val request = createConsumerFetchRequest(
+      maxResponseBytes = 1000,
+      maxPartitionBytes = 1000,
+      Seq(topicPartition),
+      offsetMap,
+      ApiKeys.FETCH.latestVersion,
+      maxWaitMs = 20000,
+      minBytes = 1,
+      rackId = followerBrokerId.toString
+    )
+    var response = connectAndReceive[FetchResponse](request, 
brokers(leaderBrokerId).socketServer)
+    assertEquals(Errors.NONE, response.error)
+    assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
+    validateFetchResponse(response, preferredReadReplica = 1)
+
+    // Shutdown follower broker. Consumer will reach out to leader after 
metadata.max.age.ms
+    brokers(followerBrokerId).shutdown()
+    response = connectAndReceive[FetchResponse](request, 
brokers(leaderBrokerId).socketServer)
+    assertEquals(Errors.NONE, response.error)
+    assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
+    validateFetchResponse(response)

Review Comment:
   nit: I would rather put `preferredReadReplica = -1` here. It makes the test 
more readable.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -1279,6 +1280,75 @@ class ReplicaManagerTest {
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testPreferredReplicaAsLeaderWhenSameRackFollowerIsOutOfIsr(): Unit = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
+      propsModifier = props => props.put(KafkaConfig.ReplicaSelectorClassProp, 
classOf[MockReplicaSelector].getName))
+
+    try {
+      val leaderBrokerId = 0
+      val followerBrokerId = 1
+      val leaderNode = new Node(leaderBrokerId, "host1", 0, "rack-a")
+      val followerNode = new Node(followerBrokerId, "host2", 1, "rack-b")
+      val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava
+      val topicId = Uuid.randomUuid()
+      val tp0 = new TopicPartition(topic, 0)
+      val tidp0 = new TopicIdPartition(topicId, tp0)
+
+      when(replicaManager.metadataCache.getPartitionReplicaEndpoints(
+        tp0,
+        new ListenerName("default")
+      )).thenReturn(Map(
+        leaderBrokerId -> leaderNode,
+        followerBrokerId -> followerNode
+      ).toMap)
+
+      // Make this replica the leader and remove follower from ISR.
+      val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
+        ApiKeys.LEADER_AND_ISR.latestVersion,
+        0,
+        0,
+        brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(leaderBrokerId)
+          .setLeaderEpoch(1)
+          .setIsr(Seq[Integer](leaderBrokerId).asJava)
+          .setPartitionEpoch(0)
+          .setReplicas(brokerList)
+          .setIsNew(false)).asJava,
+        Collections.singletonMap(topic, topicId),
+        Set(leaderNode, followerNode).asJava).build()
+
+      replicaManager.becomeLeaderOrFollower(2, leaderAndIsrRequest, (_, _) => 
())
+
+      val metadata = new DefaultClientMetadata("rack-b", "client-id",
+        InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, 
"default")
+
+      val consumerResult = fetchPartitionAsConsumer(
+        replicaManager,
+        tidp0,
+        new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()),
+        clientMetadata = Some(metadata)
+      )
+
+      // Fetch from leader succeeds
+      assertTrue(consumerResult.hasFired)

Review Comment:
   Should we assert the returned preferred replica?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to