dimitarndimitrov commented on code in PR #18657:
URL: https://github.com/apache/kafka/pull/18657#discussion_r1925019540


##########
core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala:
##########
@@ -645,6 +646,114 @@ class MetadataCacheTest {
     assertEquals(Seq(expectedNode1), partitionInfo.offlineReplicas.toSeq)
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("cacheProvider"))
+  def testGetPartitionReplicaEndpoints(cache: MetadataCache): Unit = {
+    val securityProtocol = SecurityProtocol.PLAINTEXT
+    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
+
+    // Set up broker data for the metadata cache
+    val numBrokers = 10
+    // Set only the last broker in the list to be offline in order to allow 
easy
+    // indexing of brokers in the brokerStates list - the index in the list 
will
+    // be the same as the brokerId of the broker at that position.
+    val offlineBrokerId = numBrokers - 1
+    val brokerStates = (0 until numBrokers - 1).map { brokerId =>
+      new UpdateMetadataBroker()
+        .setId(brokerId)
+        .setRack("rack" + (brokerId % 3))
+        .setEndpoints(
+          Seq(new UpdateMetadataEndpoint()
+            .setHost("foo" + brokerId)
+            .setPort(9092)
+            .setSecurityProtocol(securityProtocol.id)
+            .setListener(listenerName.value)
+          ).asJava)
+    }
+
+    val topic = "many-partitions-topic"
+    val topicId = Uuid.randomUuid()
+
+    // Set up a number of partitions such that each different combination of
+    // $replicationFactor brokers is made a replica set for exactly one 
partition
+    val replicationFactor = 3
+    val replicaSets = getAllReplicaSets(numBrokers, replicationFactor)
+    val numPartitions = replicaSets.length
+    val partitionStates = (0 until numPartitions).map { partitionId =>
+      val replicas = replicaSets(partitionId)
+      val onlineReplicas = replicas.stream().filter(id => id != 
offlineBrokerId).collect(Collectors.toList())
+      new UpdateMetadataPartitionState()
+        .setTopicName(topic)
+        .setPartitionIndex(partitionId)
+        .setReplicas(replicas)
+        .setLeader(onlineReplicas.get(0))
+        .setIsr(onlineReplicas)
+        .setOfflineReplicas(Collections.singletonList(offlineBrokerId))
+    }
+
+    // Load the prepared data in the metadata cache
+    val version = ApiKeys.UPDATE_METADATA.latestVersion
+    val controllerId = 0
+    val controllerEpoch = 123
+    val updateMetadataRequest = new UpdateMetadataRequest.Builder(
+      version,
+      controllerId,
+      controllerEpoch,
+      brokerEpoch,
+      partitionStates.asJava,
+      brokerStates.asJava,
+      Collections.singletonMap(topic, topicId)).build()
+    MetadataCacheTest.updateCache(cache, updateMetadataRequest)
+
+    (0 until numPartitions).foreach { partitionId =>
+      val tp = new TopicPartition(topic, partitionId)
+      val brokerIdToNodeMap = cache.getPartitionReplicaEndpoints(tp, 
listenerName)
+      val replicaSet = brokerIdToNodeMap.keySet
+      val expectedReplicaSet = 
partitionStates(partitionId).replicas().asScala.toSet
+      // Verify that we have endpoints for exactly the non-fenced brokers of 
the replica set
+      if (expectedReplicaSet.contains(offlineBrokerId)) {
+        assertEquals(expectedReplicaSet,
+                     replicaSet + offlineBrokerId,
+                     s"Unexpected partial replica set for partition 
$partitionId")

Review Comment:
   nit: This condition should also have asserted that `replicaSet` doesn't 
contain `offlineBrokerId` - right now the existing `assertEquals` will pass 
even if that's not the case. Also applies to the original `trunk` change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to