frankvicky commented on code in PR #17646: URL: https://github.com/apache/kafka/pull/17646#discussion_r1824298235
########## server-common/src/test/java/org/apache/kafka/admin/AdminRackAwareTest.java: ########## @@ -0,0 +1,540 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.admin; + +import org.apache.kafka.common.errors.InvalidPartitionsException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.server.common.AdminOperationException; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class AdminRackAwareTest { + + private static Collection<BrokerMetadata> toBrokerMetadata(Map<Integer, String> rackMap) { + List<BrokerMetadata> brokerMetadatas = rackMap.entrySet().stream() + .map(entry -> new BrokerMetadata(entry.getKey(), Optional.ofNullable(entry.getValue()))) + .collect(Collectors.toList()); + + brokerMetadatas.sort(Comparator.comparingInt(b -> b.id)); + + return brokerMetadatas; + } + + private static ReplicaDistributions getReplicaDistribution(Map<Integer, List<Integer>> assignment, Map<Integer, String> brokerRackMapping) { + Map<Integer, Integer> leaderCount = new HashMap<>(); + Map<Integer, Integer> partitionCount = new HashMap<>(); + Map<Integer, List<String>> partitionRackMap = new HashMap<>(); + + for (Map.Entry<Integer, List<Integer>> entry : assignment.entrySet()) { + Integer partitionId = entry.getKey(); + List<Integer> replicaList = entry.getValue(); + + // The leader is the first broker in the list + Integer leader = replicaList.get(0); + leaderCount.put(leader, leaderCount.getOrDefault(leader, 0) + 1); + + for (Integer brokerId : replicaList) { + partitionCount.put(brokerId, partitionCount.getOrDefault(brokerId, 0) + 1); Review Comment: ditto ########## server-common/src/test/java/org/apache/kafka/admin/AdminRackAwareTest.java: ########## @@ -0,0 +1,540 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.admin; + +import org.apache.kafka.common.errors.InvalidPartitionsException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.server.common.AdminOperationException; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class AdminRackAwareTest { + + private static Collection<BrokerMetadata> toBrokerMetadata(Map<Integer, String> rackMap) { + List<BrokerMetadata> brokerMetadatas = rackMap.entrySet().stream() + .map(entry -> new BrokerMetadata(entry.getKey(), Optional.ofNullable(entry.getValue()))) + .collect(Collectors.toList()); + + brokerMetadatas.sort(Comparator.comparingInt(b -> b.id)); + + return brokerMetadatas; + } + + private static ReplicaDistributions getReplicaDistribution(Map<Integer, List<Integer>> assignment, Map<Integer, String> brokerRackMapping) { + Map<Integer, Integer> leaderCount = new HashMap<>(); + Map<Integer, Integer> partitionCount = new HashMap<>(); + Map<Integer, List<String>> partitionRackMap = new HashMap<>(); + + for (Map.Entry<Integer, List<Integer>> entry : assignment.entrySet()) { + Integer partitionId = entry.getKey(); + List<Integer> replicaList = entry.getValue(); + + // The leader is the first broker in the list + Integer leader = replicaList.get(0); + leaderCount.put(leader, leaderCount.getOrDefault(leader, 0) + 1); Review Comment: We could leverage `merge` method here. ########## server-common/src/test/java/org/apache/kafka/admin/AdminRackAwareTest.java: ########## @@ -0,0 +1,540 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.admin; + +import org.apache.kafka.common.errors.InvalidPartitionsException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.server.common.AdminOperationException; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class AdminRackAwareTest { + + private static Collection<BrokerMetadata> toBrokerMetadata(Map<Integer, String> rackMap) { + List<BrokerMetadata> brokerMetadatas = rackMap.entrySet().stream() + .map(entry -> new BrokerMetadata(entry.getKey(), Optional.ofNullable(entry.getValue()))) + .collect(Collectors.toList()); + + brokerMetadatas.sort(Comparator.comparingInt(b -> b.id)); + + return brokerMetadatas; + } + + private static ReplicaDistributions getReplicaDistribution(Map<Integer, List<Integer>> assignment, Map<Integer, String> brokerRackMapping) { + Map<Integer, Integer> leaderCount = new HashMap<>(); + Map<Integer, Integer> partitionCount = new HashMap<>(); + Map<Integer, List<String>> partitionRackMap = new HashMap<>(); + + for (Map.Entry<Integer, List<Integer>> entry : assignment.entrySet()) { + Integer partitionId = entry.getKey(); + List<Integer> replicaList = entry.getValue(); + + // The leader is the first broker in the list + Integer leader = replicaList.get(0); + leaderCount.put(leader, leaderCount.getOrDefault(leader, 0) + 1); + + for (Integer brokerId : replicaList) { + partitionCount.put(brokerId, partitionCount.getOrDefault(brokerId, 0) + 1); + + String rack = brokerRackMapping.get(brokerId); + + // Add the rack to the list of racks for this partition + partitionRackMap.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(rack); + } + } + + return new ReplicaDistributions(partitionRackMap, leaderCount, partitionCount); + } + + public static void assertReplicaDistribution( + Map<Integer, List<Integer>> assignment, + Map<Integer, String> brokerRackMapping, + int numBrokers, + int numPartitions, + int replicationFactor, + boolean verifyRackAware, + boolean verifyLeaderDistribution, + boolean verifyReplicasDistribution) { + + for (Map.Entry<Integer, List<Integer>> entry : assignment.entrySet()) { + List<Integer> brokerList = entry.getValue(); + Set<Integer> brokerSet = new HashSet<>(brokerList); + assertEquals(brokerSet.size(), brokerList.size()); + } + + ReplicaDistributions distribution = getReplicaDistribution(assignment, brokerRackMapping); + + // Verify Rack-Awareness + if (verifyRackAware) { + Map<Integer, List<String>> partitionRackMap = distribution.getPartitionRacks(); + List<Integer> distinctRackSizes = partitionRackMap.values().stream() + .map(rackList -> (int) rackList.stream().distinct().count()) + .collect(Collectors.toList()); + + List<Integer> expectedRackSizes = Collections.nCopies(numPartitions, replicationFactor); + assertEquals(expectedRackSizes, distinctRackSizes); + } + + // Verify Leader Distribution + if (verifyLeaderDistribution) { + Map<Integer, Integer> leaderCount = distribution.getBrokerLeaderCount(); + int leaderCountPerBroker = numPartitions / numBrokers; + List<Integer> expectedLeaderCounts = Collections.nCopies(numBrokers, leaderCountPerBroker); + List<Integer> actualLeaderCounts = new ArrayList<>(leaderCount.values()); + assertEquals(expectedLeaderCounts, actualLeaderCounts); + } + + // Verify Replicas Distribution + if (verifyReplicasDistribution) { + Map<Integer, Integer> replicasCount = distribution.getBrokerReplicasCount(); + int numReplicasPerBroker = (numPartitions * replicationFactor) / numBrokers; + List<Integer> expectedReplicasCounts = Collections.nCopies(numBrokers, numReplicasPerBroker); + List<Integer> actualReplicasCounts = new ArrayList<>(replicasCount.values()); + assertEquals(expectedReplicasCounts, actualReplicasCounts); + } + } + + @Test + public void testGetRackAlternatedBrokerListAndAssignReplicasToBrokers() { + Map<Integer, String> rackMap = Map.of( + 0, "rack1", + 1, "rack3", + 2, "rack3", + 3, "rack2", + 4, "rack2", + 5, "rack1" + ); + + List<Integer> newList = AdminUtils.getRackAlternatedBrokerList(rackMap); + assertEquals(Arrays.asList(0, 3, 1, 5, 4, 2), newList); + + Map<Integer, String> anotherRackMap = new HashMap<>(rackMap); + anotherRackMap.remove(5); + List<Integer> anotherList = AdminUtils.getRackAlternatedBrokerList(anotherRackMap); + assertEquals(Arrays.asList(0, 3, 1, 4, 2), anotherList); + + Map<Integer, List<Integer>> assignment = AdminUtils. + assignReplicasToBrokers(toBrokerMetadata(rackMap), 7, 3, 0, 0); + + + Map<Integer, List<Integer>> expected = Map.of( + 0, Arrays.asList(0, 3, 1), + 1, Arrays.asList(3, 1, 5), + 2, Arrays.asList(1, 5, 4), + 3, Arrays.asList(5, 4, 2), + 4, Arrays.asList(4, 2, 0), + 5, Arrays.asList(2, 0, 3), + 6, Arrays.asList(0, 4, 2) + ); + + assertEquals(expected, assignment); + } + + @Test + public void testAssignmentWithRackAware() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 6; + int replicationFactor = 3; + Map<Integer, List<Integer>> assignment = AdminUtils. + assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor, 2, 0); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testAssignmentWithRackAwareWithRandomStartIndex() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 6; + int replicationFactor = 3; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testAssignmentWithRackAwareWithUnevenReplicas() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 13; + int replicationFactor = 3; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor, 0, 0); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, false, false); + } + + @Test + public void testAssignmentWithRackAwareWithUnevenRacks() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack1", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 12; + int replicationFactor = 3; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, false); + } + + @Test + public void testAssignmentWith2ReplicasRackAware() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 12; + int replicationFactor = 2; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testRackAwareExpansion() { + Map<Integer, String> brokerRackMapping = Map.of( + 6, "rack1", + 7, "rack2", + 8, "rack2", + 9, "rack3", + 10, "rack3", + 11, "rack1" + ); + + int numPartitions = 12; + int replicationFactor = 2; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor, -1, 12); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testAssignmentWith2ReplicasRackAwareWith6Partitions() { + // Define the brokerRackMapping in Java + Map<Integer, String> brokerRackMapping = Map.of( + 6, "rack1", + 7, "rack2", + 8, "rack2", + 9, "rack3", + 10, "rack3", + 11, "rack1" + ); + + int numPartitions = 6; + int replicationFactor = 2; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testAssignmentWith2ReplicasRackAwareWith6PartitionsAnd3Brokers() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 4, "rack3" + ); + + int numPartitions = 3; + int replicationFactor = 2; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testLargeNumberPartitionsAssignment() { + Map<Integer, String> brokerRackMapping = new HashMap<>(); + brokerRackMapping.put(0, "rack1"); + brokerRackMapping.put(1, "rack2"); + brokerRackMapping.put(2, "rack2"); + brokerRackMapping.put(3, "rack3"); + brokerRackMapping.put(4, "rack3"); + brokerRackMapping.put(5, "rack1"); + brokerRackMapping.put(6, "rack1"); + brokerRackMapping.put(7, "rack2"); + brokerRackMapping.put(8, "rack2"); + brokerRackMapping.put(9, "rack3"); + brokerRackMapping.put(10, "rack1"); + brokerRackMapping.put(11, "rack3"); + + int numPartitions = 96; + int replicationFactor = 3; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testMoreReplicasThanRacks() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack2" + ); + + int numPartitions = 6; + int replicationFactor = 5; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + for (List<Integer> replicas : assignment.values()) { + assertEquals(replicationFactor, replicas.size()); + } + + ReplicaDistributions distribution = getReplicaDistribution(assignment, brokerRackMapping); + + // Verify that each partition is spread across exactly 3 unique racks + for (int partition = 0; partition < numPartitions; partition++) { + List<String> racks = distribution.getPartitionRacks().get(partition); + long distinctRackCount = racks.stream().distinct().count(); + assertEquals(3, distinctRackCount); + } + } + + @Test + public void testLessReplicasThanRacks() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack2" + ); + + int numPartitions = 6; + int replicationFactor = 2; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + // Assert that each partition is assigned to the correct number of replicas + for (List<Integer> replicas : assignment.values()) { + assertEquals(replicationFactor, replicas.size()); + } + + // Get the replica distribution + ReplicaDistributions distribution = getReplicaDistribution(assignment, brokerRackMapping); + + // Verify that each partition is spread across exactly 2 unique racks + for (int partition = 0; partition <= 5; partition++) { + List<String> racks = distribution.getPartitionRacks().get(partition); + long distinctRackCount = racks.stream().distinct().count(); + assertEquals(2, distinctRackCount); + } + + } + + @Test + public void testSingleRack() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack1", + 2, "rack1", + 3, "rack1", + 4, "rack1", + 5, "rack1" + ); + + int numPartitions = 6; + int replicationFactor = 3; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + for (List<Integer> replicas : assignment.values()) { + assertEquals(replicationFactor, replicas.size()); + } + + ReplicaDistributions distribution = getReplicaDistribution(assignment, brokerRackMapping); + + for (int partition = 0; partition < numPartitions; partition++) { + List<String> racks = distribution.getPartitionRacks().get(partition); + long distinctRackCount = racks.stream().distinct().count(); + assertEquals(1, distinctRackCount); + } + + for (Integer broker : brokerRackMapping.keySet()) { + int leaderCount = distribution.getBrokerLeaderCount().getOrDefault(broker, 0); + assertEquals(1, leaderCount); + } + + } + + @Test + public void testSkipBrokerWithReplicaAlreadyAssigned() { + Map<Integer, String> rackInfo = Map.of( + 0, "a", + 1, "b", + 2, "c", + 3, "a", + 4, "a" + ); + + List<Integer> brokerList = IntStream.rangeClosed(0, 4).boxed().collect(Collectors.toList()); + + int numPartitions = 6; + int replicationFactor = 4; + + List<BrokerMetadata> brokerMetadatas = toBrokerMetadata(rackInfo).stream().collect(Collectors.toList()); Review Comment: We could leverage constructor of `ArrayList`, it's more clear. ########## server-common/src/test/java/org/apache/kafka/admin/AdminRackAwareTest.java: ########## @@ -0,0 +1,540 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.admin; + +import org.apache.kafka.common.errors.InvalidPartitionsException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.server.common.AdminOperationException; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class AdminRackAwareTest { + + private static Collection<BrokerMetadata> toBrokerMetadata(Map<Integer, String> rackMap) { + List<BrokerMetadata> brokerMetadatas = rackMap.entrySet().stream() + .map(entry -> new BrokerMetadata(entry.getKey(), Optional.ofNullable(entry.getValue()))) + .collect(Collectors.toList()); + + brokerMetadatas.sort(Comparator.comparingInt(b -> b.id)); Review Comment: We could sort the `brokerMetadatas` in stream pipeline. ########## server-common/src/test/java/org/apache/kafka/admin/AdminRackAwareTest.java: ########## @@ -0,0 +1,540 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.admin; + +import org.apache.kafka.common.errors.InvalidPartitionsException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.server.common.AdminOperationException; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class AdminRackAwareTest { + + private static Collection<BrokerMetadata> toBrokerMetadata(Map<Integer, String> rackMap) { + List<BrokerMetadata> brokerMetadatas = rackMap.entrySet().stream() + .map(entry -> new BrokerMetadata(entry.getKey(), Optional.ofNullable(entry.getValue()))) + .collect(Collectors.toList()); + + brokerMetadatas.sort(Comparator.comparingInt(b -> b.id)); + + return brokerMetadatas; + } + + private static ReplicaDistributions getReplicaDistribution(Map<Integer, List<Integer>> assignment, Map<Integer, String> brokerRackMapping) { + Map<Integer, Integer> leaderCount = new HashMap<>(); + Map<Integer, Integer> partitionCount = new HashMap<>(); + Map<Integer, List<String>> partitionRackMap = new HashMap<>(); + + for (Map.Entry<Integer, List<Integer>> entry : assignment.entrySet()) { + Integer partitionId = entry.getKey(); + List<Integer> replicaList = entry.getValue(); + + // The leader is the first broker in the list + Integer leader = replicaList.get(0); + leaderCount.put(leader, leaderCount.getOrDefault(leader, 0) + 1); + + for (Integer brokerId : replicaList) { + partitionCount.put(brokerId, partitionCount.getOrDefault(brokerId, 0) + 1); + + String rack = brokerRackMapping.get(brokerId); + + // Add the rack to the list of racks for this partition + partitionRackMap.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(rack); Review Comment: no need another variable since `rack` only use once. ########## server-common/src/test/java/org/apache/kafka/admin/AdminRackAwareTest.java: ########## @@ -0,0 +1,540 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.admin; + +import org.apache.kafka.common.errors.InvalidPartitionsException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.server.common.AdminOperationException; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class AdminRackAwareTest { + + private static Collection<BrokerMetadata> toBrokerMetadata(Map<Integer, String> rackMap) { + List<BrokerMetadata> brokerMetadatas = rackMap.entrySet().stream() + .map(entry -> new BrokerMetadata(entry.getKey(), Optional.ofNullable(entry.getValue()))) + .collect(Collectors.toList()); + + brokerMetadatas.sort(Comparator.comparingInt(b -> b.id)); + + return brokerMetadatas; + } + + private static ReplicaDistributions getReplicaDistribution(Map<Integer, List<Integer>> assignment, Map<Integer, String> brokerRackMapping) { + Map<Integer, Integer> leaderCount = new HashMap<>(); + Map<Integer, Integer> partitionCount = new HashMap<>(); + Map<Integer, List<String>> partitionRackMap = new HashMap<>(); + + for (Map.Entry<Integer, List<Integer>> entry : assignment.entrySet()) { + Integer partitionId = entry.getKey(); + List<Integer> replicaList = entry.getValue(); + + // The leader is the first broker in the list + Integer leader = replicaList.get(0); + leaderCount.put(leader, leaderCount.getOrDefault(leader, 0) + 1); + + for (Integer brokerId : replicaList) { + partitionCount.put(brokerId, partitionCount.getOrDefault(brokerId, 0) + 1); + + String rack = brokerRackMapping.get(brokerId); + + // Add the rack to the list of racks for this partition + partitionRackMap.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(rack); + } + } + + return new ReplicaDistributions(partitionRackMap, leaderCount, partitionCount); + } + + public static void assertReplicaDistribution( + Map<Integer, List<Integer>> assignment, + Map<Integer, String> brokerRackMapping, + int numBrokers, + int numPartitions, + int replicationFactor, + boolean verifyRackAware, + boolean verifyLeaderDistribution, + boolean verifyReplicasDistribution) { + + for (Map.Entry<Integer, List<Integer>> entry : assignment.entrySet()) { + List<Integer> brokerList = entry.getValue(); + Set<Integer> brokerSet = new HashSet<>(brokerList); + assertEquals(brokerSet.size(), brokerList.size()); + } + + ReplicaDistributions distribution = getReplicaDistribution(assignment, brokerRackMapping); + + // Verify Rack-Awareness + if (verifyRackAware) { + Map<Integer, List<String>> partitionRackMap = distribution.getPartitionRacks(); + List<Integer> distinctRackSizes = partitionRackMap.values().stream() + .map(rackList -> (int) rackList.stream().distinct().count()) + .collect(Collectors.toList()); + + List<Integer> expectedRackSizes = Collections.nCopies(numPartitions, replicationFactor); + assertEquals(expectedRackSizes, distinctRackSizes); + } + + // Verify Leader Distribution + if (verifyLeaderDistribution) { + Map<Integer, Integer> leaderCount = distribution.getBrokerLeaderCount(); + int leaderCountPerBroker = numPartitions / numBrokers; + List<Integer> expectedLeaderCounts = Collections.nCopies(numBrokers, leaderCountPerBroker); + List<Integer> actualLeaderCounts = new ArrayList<>(leaderCount.values()); + assertEquals(expectedLeaderCounts, actualLeaderCounts); + } + + // Verify Replicas Distribution + if (verifyReplicasDistribution) { + Map<Integer, Integer> replicasCount = distribution.getBrokerReplicasCount(); + int numReplicasPerBroker = (numPartitions * replicationFactor) / numBrokers; + List<Integer> expectedReplicasCounts = Collections.nCopies(numBrokers, numReplicasPerBroker); + List<Integer> actualReplicasCounts = new ArrayList<>(replicasCount.values()); + assertEquals(expectedReplicasCounts, actualReplicasCounts); + } + } + + @Test + public void testGetRackAlternatedBrokerListAndAssignReplicasToBrokers() { + Map<Integer, String> rackMap = Map.of( + 0, "rack1", + 1, "rack3", + 2, "rack3", + 3, "rack2", + 4, "rack2", + 5, "rack1" + ); + + List<Integer> newList = AdminUtils.getRackAlternatedBrokerList(rackMap); + assertEquals(Arrays.asList(0, 3, 1, 5, 4, 2), newList); + + Map<Integer, String> anotherRackMap = new HashMap<>(rackMap); + anotherRackMap.remove(5); + List<Integer> anotherList = AdminUtils.getRackAlternatedBrokerList(anotherRackMap); + assertEquals(Arrays.asList(0, 3, 1, 4, 2), anotherList); + + Map<Integer, List<Integer>> assignment = AdminUtils. + assignReplicasToBrokers(toBrokerMetadata(rackMap), 7, 3, 0, 0); + + + Map<Integer, List<Integer>> expected = Map.of( + 0, Arrays.asList(0, 3, 1), + 1, Arrays.asList(3, 1, 5), + 2, Arrays.asList(1, 5, 4), + 3, Arrays.asList(5, 4, 2), + 4, Arrays.asList(4, 2, 0), + 5, Arrays.asList(2, 0, 3), + 6, Arrays.asList(0, 4, 2) + ); + + assertEquals(expected, assignment); + } + + @Test + public void testAssignmentWithRackAware() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 6; + int replicationFactor = 3; + Map<Integer, List<Integer>> assignment = AdminUtils. + assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor, 2, 0); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testAssignmentWithRackAwareWithRandomStartIndex() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 6; + int replicationFactor = 3; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testAssignmentWithRackAwareWithUnevenReplicas() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 13; + int replicationFactor = 3; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor, 0, 0); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, false, false); + } + + @Test + public void testAssignmentWithRackAwareWithUnevenRacks() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack1", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 12; + int replicationFactor = 3; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, false); + } + + @Test + public void testAssignmentWith2ReplicasRackAware() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 12; + int replicationFactor = 2; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testRackAwareExpansion() { + Map<Integer, String> brokerRackMapping = Map.of( + 6, "rack1", + 7, "rack2", + 8, "rack2", + 9, "rack3", + 10, "rack3", + 11, "rack1" + ); + + int numPartitions = 12; + int replicationFactor = 2; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor, -1, 12); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testAssignmentWith2ReplicasRackAwareWith6Partitions() { + // Define the brokerRackMapping in Java + Map<Integer, String> brokerRackMapping = Map.of( + 6, "rack1", + 7, "rack2", + 8, "rack2", + 9, "rack3", + 10, "rack3", + 11, "rack1" + ); + + int numPartitions = 6; + int replicationFactor = 2; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testAssignmentWith2ReplicasRackAwareWith6PartitionsAnd3Brokers() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 4, "rack3" + ); + + int numPartitions = 3; + int replicationFactor = 2; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testLargeNumberPartitionsAssignment() { + Map<Integer, String> brokerRackMapping = new HashMap<>(); + brokerRackMapping.put(0, "rack1"); + brokerRackMapping.put(1, "rack2"); + brokerRackMapping.put(2, "rack2"); + brokerRackMapping.put(3, "rack3"); + brokerRackMapping.put(4, "rack3"); + brokerRackMapping.put(5, "rack1"); + brokerRackMapping.put(6, "rack1"); + brokerRackMapping.put(7, "rack2"); + brokerRackMapping.put(8, "rack2"); + brokerRackMapping.put(9, "rack3"); + brokerRackMapping.put(10, "rack1"); + brokerRackMapping.put(11, "rack3"); + + int numPartitions = 96; + int replicationFactor = 3; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testMoreReplicasThanRacks() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack2" + ); + + int numPartitions = 6; + int replicationFactor = 5; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + for (List<Integer> replicas : assignment.values()) { Review Comment: `forEach` ########## server-common/src/test/java/org/apache/kafka/admin/AdminRackAwareTest.java: ########## @@ -0,0 +1,540 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.admin; + +import org.apache.kafka.common.errors.InvalidPartitionsException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.server.common.AdminOperationException; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class AdminRackAwareTest { + + private static Collection<BrokerMetadata> toBrokerMetadata(Map<Integer, String> rackMap) { + List<BrokerMetadata> brokerMetadatas = rackMap.entrySet().stream() + .map(entry -> new BrokerMetadata(entry.getKey(), Optional.ofNullable(entry.getValue()))) + .collect(Collectors.toList()); + + brokerMetadatas.sort(Comparator.comparingInt(b -> b.id)); + + return brokerMetadatas; + } + + private static ReplicaDistributions getReplicaDistribution(Map<Integer, List<Integer>> assignment, Map<Integer, String> brokerRackMapping) { + Map<Integer, Integer> leaderCount = new HashMap<>(); + Map<Integer, Integer> partitionCount = new HashMap<>(); + Map<Integer, List<String>> partitionRackMap = new HashMap<>(); + + for (Map.Entry<Integer, List<Integer>> entry : assignment.entrySet()) { + Integer partitionId = entry.getKey(); + List<Integer> replicaList = entry.getValue(); + + // The leader is the first broker in the list + Integer leader = replicaList.get(0); + leaderCount.put(leader, leaderCount.getOrDefault(leader, 0) + 1); + + for (Integer brokerId : replicaList) { + partitionCount.put(brokerId, partitionCount.getOrDefault(brokerId, 0) + 1); + + String rack = brokerRackMapping.get(brokerId); + + // Add the rack to the list of racks for this partition + partitionRackMap.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(rack); + } + } + + return new ReplicaDistributions(partitionRackMap, leaderCount, partitionCount); + } + + public static void assertReplicaDistribution( + Map<Integer, List<Integer>> assignment, + Map<Integer, String> brokerRackMapping, + int numBrokers, + int numPartitions, + int replicationFactor, + boolean verifyRackAware, + boolean verifyLeaderDistribution, + boolean verifyReplicasDistribution) { + + for (Map.Entry<Integer, List<Integer>> entry : assignment.entrySet()) { + List<Integer> brokerList = entry.getValue(); + Set<Integer> brokerSet = new HashSet<>(brokerList); + assertEquals(brokerSet.size(), brokerList.size()); + } + + ReplicaDistributions distribution = getReplicaDistribution(assignment, brokerRackMapping); + + // Verify Rack-Awareness + if (verifyRackAware) { + Map<Integer, List<String>> partitionRackMap = distribution.getPartitionRacks(); + List<Integer> distinctRackSizes = partitionRackMap.values().stream() + .map(rackList -> (int) rackList.stream().distinct().count()) + .collect(Collectors.toList()); + + List<Integer> expectedRackSizes = Collections.nCopies(numPartitions, replicationFactor); + assertEquals(expectedRackSizes, distinctRackSizes); + } + + // Verify Leader Distribution + if (verifyLeaderDistribution) { + Map<Integer, Integer> leaderCount = distribution.getBrokerLeaderCount(); + int leaderCountPerBroker = numPartitions / numBrokers; + List<Integer> expectedLeaderCounts = Collections.nCopies(numBrokers, leaderCountPerBroker); + List<Integer> actualLeaderCounts = new ArrayList<>(leaderCount.values()); + assertEquals(expectedLeaderCounts, actualLeaderCounts); + } + + // Verify Replicas Distribution + if (verifyReplicasDistribution) { + Map<Integer, Integer> replicasCount = distribution.getBrokerReplicasCount(); + int numReplicasPerBroker = (numPartitions * replicationFactor) / numBrokers; + List<Integer> expectedReplicasCounts = Collections.nCopies(numBrokers, numReplicasPerBroker); + List<Integer> actualReplicasCounts = new ArrayList<>(replicasCount.values()); + assertEquals(expectedReplicasCounts, actualReplicasCounts); + } + } + + @Test + public void testGetRackAlternatedBrokerListAndAssignReplicasToBrokers() { + Map<Integer, String> rackMap = Map.of( + 0, "rack1", + 1, "rack3", + 2, "rack3", + 3, "rack2", + 4, "rack2", + 5, "rack1" + ); + + List<Integer> newList = AdminUtils.getRackAlternatedBrokerList(rackMap); + assertEquals(Arrays.asList(0, 3, 1, 5, 4, 2), newList); + + Map<Integer, String> anotherRackMap = new HashMap<>(rackMap); + anotherRackMap.remove(5); + List<Integer> anotherList = AdminUtils.getRackAlternatedBrokerList(anotherRackMap); + assertEquals(Arrays.asList(0, 3, 1, 4, 2), anotherList); + + Map<Integer, List<Integer>> assignment = AdminUtils. + assignReplicasToBrokers(toBrokerMetadata(rackMap), 7, 3, 0, 0); + + + Map<Integer, List<Integer>> expected = Map.of( + 0, Arrays.asList(0, 3, 1), + 1, Arrays.asList(3, 1, 5), + 2, Arrays.asList(1, 5, 4), + 3, Arrays.asList(5, 4, 2), + 4, Arrays.asList(4, 2, 0), + 5, Arrays.asList(2, 0, 3), + 6, Arrays.asList(0, 4, 2) + ); + + assertEquals(expected, assignment); + } + + @Test + public void testAssignmentWithRackAware() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 6; + int replicationFactor = 3; + Map<Integer, List<Integer>> assignment = AdminUtils. + assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor, 2, 0); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testAssignmentWithRackAwareWithRandomStartIndex() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 6; + int replicationFactor = 3; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testAssignmentWithRackAwareWithUnevenReplicas() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 13; + int replicationFactor = 3; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor, 0, 0); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, false, false); + } + + @Test + public void testAssignmentWithRackAwareWithUnevenRacks() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack1", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 12; + int replicationFactor = 3; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, false); + } + + @Test + public void testAssignmentWith2ReplicasRackAware() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 12; + int replicationFactor = 2; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testRackAwareExpansion() { + Map<Integer, String> brokerRackMapping = Map.of( + 6, "rack1", + 7, "rack2", + 8, "rack2", + 9, "rack3", + 10, "rack3", + 11, "rack1" + ); + + int numPartitions = 12; + int replicationFactor = 2; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor, -1, 12); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testAssignmentWith2ReplicasRackAwareWith6Partitions() { + // Define the brokerRackMapping in Java + Map<Integer, String> brokerRackMapping = Map.of( + 6, "rack1", + 7, "rack2", + 8, "rack2", + 9, "rack3", + 10, "rack3", + 11, "rack1" + ); + + int numPartitions = 6; + int replicationFactor = 2; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testAssignmentWith2ReplicasRackAwareWith6PartitionsAnd3Brokers() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 4, "rack3" + ); + + int numPartitions = 3; + int replicationFactor = 2; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testLargeNumberPartitionsAssignment() { + Map<Integer, String> brokerRackMapping = new HashMap<>(); + brokerRackMapping.put(0, "rack1"); + brokerRackMapping.put(1, "rack2"); + brokerRackMapping.put(2, "rack2"); + brokerRackMapping.put(3, "rack3"); + brokerRackMapping.put(4, "rack3"); + brokerRackMapping.put(5, "rack1"); + brokerRackMapping.put(6, "rack1"); + brokerRackMapping.put(7, "rack2"); + brokerRackMapping.put(8, "rack2"); + brokerRackMapping.put(9, "rack3"); + brokerRackMapping.put(10, "rack1"); + brokerRackMapping.put(11, "rack3"); + + int numPartitions = 96; + int replicationFactor = 3; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testMoreReplicasThanRacks() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack2" + ); + + int numPartitions = 6; + int replicationFactor = 5; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + for (List<Integer> replicas : assignment.values()) { + assertEquals(replicationFactor, replicas.size()); + } + + ReplicaDistributions distribution = getReplicaDistribution(assignment, brokerRackMapping); + + // Verify that each partition is spread across exactly 3 unique racks + for (int partition = 0; partition < numPartitions; partition++) { + List<String> racks = distribution.getPartitionRacks().get(partition); + long distinctRackCount = racks.stream().distinct().count(); + assertEquals(3, distinctRackCount); + } + } + + @Test + public void testLessReplicasThanRacks() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack2" + ); + + int numPartitions = 6; + int replicationFactor = 2; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + // Assert that each partition is assigned to the correct number of replicas + for (List<Integer> replicas : assignment.values()) { + assertEquals(replicationFactor, replicas.size()); + } + + // Get the replica distribution + ReplicaDistributions distribution = getReplicaDistribution(assignment, brokerRackMapping); + + // Verify that each partition is spread across exactly 2 unique racks + for (int partition = 0; partition <= 5; partition++) { + List<String> racks = distribution.getPartitionRacks().get(partition); + long distinctRackCount = racks.stream().distinct().count(); + assertEquals(2, distinctRackCount); + } + + } + + @Test + public void testSingleRack() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack1", + 2, "rack1", + 3, "rack1", + 4, "rack1", + 5, "rack1" + ); + + int numPartitions = 6; + int replicationFactor = 3; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + for (List<Integer> replicas : assignment.values()) { Review Comment: `forEach` ########## server-common/src/test/java/org/apache/kafka/admin/AdminRackAwareTest.java: ########## @@ -0,0 +1,540 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.admin; + +import org.apache.kafka.common.errors.InvalidPartitionsException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.server.common.AdminOperationException; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class AdminRackAwareTest { + + private static Collection<BrokerMetadata> toBrokerMetadata(Map<Integer, String> rackMap) { + List<BrokerMetadata> brokerMetadatas = rackMap.entrySet().stream() + .map(entry -> new BrokerMetadata(entry.getKey(), Optional.ofNullable(entry.getValue()))) + .collect(Collectors.toList()); + + brokerMetadatas.sort(Comparator.comparingInt(b -> b.id)); + + return brokerMetadatas; + } + + private static ReplicaDistributions getReplicaDistribution(Map<Integer, List<Integer>> assignment, Map<Integer, String> brokerRackMapping) { + Map<Integer, Integer> leaderCount = new HashMap<>(); + Map<Integer, Integer> partitionCount = new HashMap<>(); + Map<Integer, List<String>> partitionRackMap = new HashMap<>(); + + for (Map.Entry<Integer, List<Integer>> entry : assignment.entrySet()) { + Integer partitionId = entry.getKey(); + List<Integer> replicaList = entry.getValue(); + + // The leader is the first broker in the list + Integer leader = replicaList.get(0); + leaderCount.put(leader, leaderCount.getOrDefault(leader, 0) + 1); + + for (Integer brokerId : replicaList) { + partitionCount.put(brokerId, partitionCount.getOrDefault(brokerId, 0) + 1); + + String rack = brokerRackMapping.get(brokerId); + + // Add the rack to the list of racks for this partition + partitionRackMap.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(rack); + } + } + + return new ReplicaDistributions(partitionRackMap, leaderCount, partitionCount); + } + + public static void assertReplicaDistribution( + Map<Integer, List<Integer>> assignment, + Map<Integer, String> brokerRackMapping, + int numBrokers, + int numPartitions, + int replicationFactor, + boolean verifyRackAware, + boolean verifyLeaderDistribution, + boolean verifyReplicasDistribution) { + + for (Map.Entry<Integer, List<Integer>> entry : assignment.entrySet()) { + List<Integer> brokerList = entry.getValue(); + Set<Integer> brokerSet = new HashSet<>(brokerList); + assertEquals(brokerSet.size(), brokerList.size()); + } + + ReplicaDistributions distribution = getReplicaDistribution(assignment, brokerRackMapping); + + // Verify Rack-Awareness + if (verifyRackAware) { + Map<Integer, List<String>> partitionRackMap = distribution.getPartitionRacks(); + List<Integer> distinctRackSizes = partitionRackMap.values().stream() + .map(rackList -> (int) rackList.stream().distinct().count()) + .collect(Collectors.toList()); + + List<Integer> expectedRackSizes = Collections.nCopies(numPartitions, replicationFactor); + assertEquals(expectedRackSizes, distinctRackSizes); + } + + // Verify Leader Distribution + if (verifyLeaderDistribution) { + Map<Integer, Integer> leaderCount = distribution.getBrokerLeaderCount(); + int leaderCountPerBroker = numPartitions / numBrokers; + List<Integer> expectedLeaderCounts = Collections.nCopies(numBrokers, leaderCountPerBroker); + List<Integer> actualLeaderCounts = new ArrayList<>(leaderCount.values()); + assertEquals(expectedLeaderCounts, actualLeaderCounts); + } + + // Verify Replicas Distribution + if (verifyReplicasDistribution) { + Map<Integer, Integer> replicasCount = distribution.getBrokerReplicasCount(); + int numReplicasPerBroker = (numPartitions * replicationFactor) / numBrokers; + List<Integer> expectedReplicasCounts = Collections.nCopies(numBrokers, numReplicasPerBroker); + List<Integer> actualReplicasCounts = new ArrayList<>(replicasCount.values()); + assertEquals(expectedReplicasCounts, actualReplicasCounts); + } + } + + @Test + public void testGetRackAlternatedBrokerListAndAssignReplicasToBrokers() { + Map<Integer, String> rackMap = Map.of( + 0, "rack1", + 1, "rack3", + 2, "rack3", + 3, "rack2", + 4, "rack2", + 5, "rack1" + ); + + List<Integer> newList = AdminUtils.getRackAlternatedBrokerList(rackMap); + assertEquals(Arrays.asList(0, 3, 1, 5, 4, 2), newList); + + Map<Integer, String> anotherRackMap = new HashMap<>(rackMap); + anotherRackMap.remove(5); + List<Integer> anotherList = AdminUtils.getRackAlternatedBrokerList(anotherRackMap); + assertEquals(Arrays.asList(0, 3, 1, 4, 2), anotherList); + + Map<Integer, List<Integer>> assignment = AdminUtils. + assignReplicasToBrokers(toBrokerMetadata(rackMap), 7, 3, 0, 0); + + + Map<Integer, List<Integer>> expected = Map.of( + 0, Arrays.asList(0, 3, 1), + 1, Arrays.asList(3, 1, 5), + 2, Arrays.asList(1, 5, 4), + 3, Arrays.asList(5, 4, 2), + 4, Arrays.asList(4, 2, 0), + 5, Arrays.asList(2, 0, 3), + 6, Arrays.asList(0, 4, 2) + ); + + assertEquals(expected, assignment); + } + + @Test + public void testAssignmentWithRackAware() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 6; + int replicationFactor = 3; + Map<Integer, List<Integer>> assignment = AdminUtils. + assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor, 2, 0); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testAssignmentWithRackAwareWithRandomStartIndex() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 6; + int replicationFactor = 3; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testAssignmentWithRackAwareWithUnevenReplicas() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 13; + int replicationFactor = 3; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor, 0, 0); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, false, false); + } + + @Test + public void testAssignmentWithRackAwareWithUnevenRacks() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack1", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 12; + int replicationFactor = 3; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, false); + } + + @Test + public void testAssignmentWith2ReplicasRackAware() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 12; + int replicationFactor = 2; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testRackAwareExpansion() { + Map<Integer, String> brokerRackMapping = Map.of( + 6, "rack1", + 7, "rack2", + 8, "rack2", + 9, "rack3", + 10, "rack3", + 11, "rack1" + ); + + int numPartitions = 12; + int replicationFactor = 2; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor, -1, 12); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testAssignmentWith2ReplicasRackAwareWith6Partitions() { + // Define the brokerRackMapping in Java + Map<Integer, String> brokerRackMapping = Map.of( + 6, "rack1", + 7, "rack2", + 8, "rack2", + 9, "rack3", + 10, "rack3", + 11, "rack1" + ); + + int numPartitions = 6; + int replicationFactor = 2; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testAssignmentWith2ReplicasRackAwareWith6PartitionsAnd3Brokers() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 4, "rack3" + ); + + int numPartitions = 3; + int replicationFactor = 2; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testLargeNumberPartitionsAssignment() { + Map<Integer, String> brokerRackMapping = new HashMap<>(); + brokerRackMapping.put(0, "rack1"); + brokerRackMapping.put(1, "rack2"); + brokerRackMapping.put(2, "rack2"); + brokerRackMapping.put(3, "rack3"); + brokerRackMapping.put(4, "rack3"); + brokerRackMapping.put(5, "rack1"); + brokerRackMapping.put(6, "rack1"); + brokerRackMapping.put(7, "rack2"); + brokerRackMapping.put(8, "rack2"); + brokerRackMapping.put(9, "rack3"); + brokerRackMapping.put(10, "rack1"); + brokerRackMapping.put(11, "rack3"); + + int numPartitions = 96; + int replicationFactor = 3; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testMoreReplicasThanRacks() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack2" + ); + + int numPartitions = 6; + int replicationFactor = 5; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + for (List<Integer> replicas : assignment.values()) { + assertEquals(replicationFactor, replicas.size()); + } + + ReplicaDistributions distribution = getReplicaDistribution(assignment, brokerRackMapping); + + // Verify that each partition is spread across exactly 3 unique racks + for (int partition = 0; partition < numPartitions; partition++) { + List<String> racks = distribution.getPartitionRacks().get(partition); + long distinctRackCount = racks.stream().distinct().count(); + assertEquals(3, distinctRackCount); + } + } + + @Test + public void testLessReplicasThanRacks() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack2" + ); + + int numPartitions = 6; + int replicationFactor = 2; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + // Assert that each partition is assigned to the correct number of replicas + for (List<Integer> replicas : assignment.values()) { Review Comment: `forEach` ########## server-common/src/test/java/org/apache/kafka/admin/AdminRackAwareTest.java: ########## @@ -0,0 +1,540 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.admin; + +import org.apache.kafka.common.errors.InvalidPartitionsException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.server.common.AdminOperationException; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class AdminRackAwareTest { + + private static Collection<BrokerMetadata> toBrokerMetadata(Map<Integer, String> rackMap) { + List<BrokerMetadata> brokerMetadatas = rackMap.entrySet().stream() + .map(entry -> new BrokerMetadata(entry.getKey(), Optional.ofNullable(entry.getValue()))) + .collect(Collectors.toList()); + + brokerMetadatas.sort(Comparator.comparingInt(b -> b.id)); + + return brokerMetadatas; + } + + private static ReplicaDistributions getReplicaDistribution(Map<Integer, List<Integer>> assignment, Map<Integer, String> brokerRackMapping) { + Map<Integer, Integer> leaderCount = new HashMap<>(); + Map<Integer, Integer> partitionCount = new HashMap<>(); + Map<Integer, List<String>> partitionRackMap = new HashMap<>(); + + for (Map.Entry<Integer, List<Integer>> entry : assignment.entrySet()) { + Integer partitionId = entry.getKey(); + List<Integer> replicaList = entry.getValue(); + + // The leader is the first broker in the list + Integer leader = replicaList.get(0); + leaderCount.put(leader, leaderCount.getOrDefault(leader, 0) + 1); + + for (Integer brokerId : replicaList) { + partitionCount.put(brokerId, partitionCount.getOrDefault(brokerId, 0) + 1); + + String rack = brokerRackMapping.get(brokerId); + + // Add the rack to the list of racks for this partition + partitionRackMap.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(rack); + } + } + + return new ReplicaDistributions(partitionRackMap, leaderCount, partitionCount); + } + + public static void assertReplicaDistribution( + Map<Integer, List<Integer>> assignment, + Map<Integer, String> brokerRackMapping, + int numBrokers, + int numPartitions, + int replicationFactor, + boolean verifyRackAware, + boolean verifyLeaderDistribution, + boolean verifyReplicasDistribution) { + + for (Map.Entry<Integer, List<Integer>> entry : assignment.entrySet()) { + List<Integer> brokerList = entry.getValue(); + Set<Integer> brokerSet = new HashSet<>(brokerList); + assertEquals(brokerSet.size(), brokerList.size()); + } + + ReplicaDistributions distribution = getReplicaDistribution(assignment, brokerRackMapping); + + // Verify Rack-Awareness + if (verifyRackAware) { + Map<Integer, List<String>> partitionRackMap = distribution.getPartitionRacks(); + List<Integer> distinctRackSizes = partitionRackMap.values().stream() + .map(rackList -> (int) rackList.stream().distinct().count()) + .collect(Collectors.toList()); + + List<Integer> expectedRackSizes = Collections.nCopies(numPartitions, replicationFactor); + assertEquals(expectedRackSizes, distinctRackSizes); + } + + // Verify Leader Distribution + if (verifyLeaderDistribution) { + Map<Integer, Integer> leaderCount = distribution.getBrokerLeaderCount(); + int leaderCountPerBroker = numPartitions / numBrokers; + List<Integer> expectedLeaderCounts = Collections.nCopies(numBrokers, leaderCountPerBroker); + List<Integer> actualLeaderCounts = new ArrayList<>(leaderCount.values()); + assertEquals(expectedLeaderCounts, actualLeaderCounts); + } + + // Verify Replicas Distribution + if (verifyReplicasDistribution) { + Map<Integer, Integer> replicasCount = distribution.getBrokerReplicasCount(); + int numReplicasPerBroker = (numPartitions * replicationFactor) / numBrokers; + List<Integer> expectedReplicasCounts = Collections.nCopies(numBrokers, numReplicasPerBroker); + List<Integer> actualReplicasCounts = new ArrayList<>(replicasCount.values()); + assertEquals(expectedReplicasCounts, actualReplicasCounts); + } + } + + @Test + public void testGetRackAlternatedBrokerListAndAssignReplicasToBrokers() { + Map<Integer, String> rackMap = Map.of( + 0, "rack1", + 1, "rack3", + 2, "rack3", + 3, "rack2", + 4, "rack2", + 5, "rack1" + ); + + List<Integer> newList = AdminUtils.getRackAlternatedBrokerList(rackMap); + assertEquals(Arrays.asList(0, 3, 1, 5, 4, 2), newList); + + Map<Integer, String> anotherRackMap = new HashMap<>(rackMap); + anotherRackMap.remove(5); + List<Integer> anotherList = AdminUtils.getRackAlternatedBrokerList(anotherRackMap); + assertEquals(Arrays.asList(0, 3, 1, 4, 2), anotherList); + + Map<Integer, List<Integer>> assignment = AdminUtils. + assignReplicasToBrokers(toBrokerMetadata(rackMap), 7, 3, 0, 0); + + + Map<Integer, List<Integer>> expected = Map.of( + 0, Arrays.asList(0, 3, 1), + 1, Arrays.asList(3, 1, 5), + 2, Arrays.asList(1, 5, 4), + 3, Arrays.asList(5, 4, 2), + 4, Arrays.asList(4, 2, 0), + 5, Arrays.asList(2, 0, 3), + 6, Arrays.asList(0, 4, 2) + ); + + assertEquals(expected, assignment); + } + + @Test + public void testAssignmentWithRackAware() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 6; + int replicationFactor = 3; + Map<Integer, List<Integer>> assignment = AdminUtils. + assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor, 2, 0); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testAssignmentWithRackAwareWithRandomStartIndex() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 6; + int replicationFactor = 3; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testAssignmentWithRackAwareWithUnevenReplicas() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 13; + int replicationFactor = 3; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor, 0, 0); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, false, false); + } + + @Test + public void testAssignmentWithRackAwareWithUnevenRacks() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack1", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 12; + int replicationFactor = 3; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, false); + } + + @Test + public void testAssignmentWith2ReplicasRackAware() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack1" + ); + + int numPartitions = 12; + int replicationFactor = 2; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testRackAwareExpansion() { + Map<Integer, String> brokerRackMapping = Map.of( + 6, "rack1", + 7, "rack2", + 8, "rack2", + 9, "rack3", + 10, "rack3", + 11, "rack1" + ); + + int numPartitions = 12; + int replicationFactor = 2; + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor, -1, 12); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testAssignmentWith2ReplicasRackAwareWith6Partitions() { + // Define the brokerRackMapping in Java + Map<Integer, String> brokerRackMapping = Map.of( + 6, "rack1", + 7, "rack2", + 8, "rack2", + 9, "rack3", + 10, "rack3", + 11, "rack1" + ); + + int numPartitions = 6; + int replicationFactor = 2; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testAssignmentWith2ReplicasRackAwareWith6PartitionsAnd3Brokers() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 4, "rack3" + ); + + int numPartitions = 3; + int replicationFactor = 2; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testLargeNumberPartitionsAssignment() { + Map<Integer, String> brokerRackMapping = new HashMap<>(); + brokerRackMapping.put(0, "rack1"); + brokerRackMapping.put(1, "rack2"); + brokerRackMapping.put(2, "rack2"); + brokerRackMapping.put(3, "rack3"); + brokerRackMapping.put(4, "rack3"); + brokerRackMapping.put(5, "rack1"); + brokerRackMapping.put(6, "rack1"); + brokerRackMapping.put(7, "rack2"); + brokerRackMapping.put(8, "rack2"); + brokerRackMapping.put(9, "rack3"); + brokerRackMapping.put(10, "rack1"); + brokerRackMapping.put(11, "rack3"); + + int numPartitions = 96; + int replicationFactor = 3; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + assertReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size(), + numPartitions, replicationFactor, true, true, true); + } + + @Test + public void testMoreReplicasThanRacks() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack2" + ); + + int numPartitions = 6; + int replicationFactor = 5; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + for (List<Integer> replicas : assignment.values()) { + assertEquals(replicationFactor, replicas.size()); + } + + ReplicaDistributions distribution = getReplicaDistribution(assignment, brokerRackMapping); + + // Verify that each partition is spread across exactly 3 unique racks + for (int partition = 0; partition < numPartitions; partition++) { + List<String> racks = distribution.getPartitionRacks().get(partition); + long distinctRackCount = racks.stream().distinct().count(); + assertEquals(3, distinctRackCount); + } + } + + @Test + public void testLessReplicasThanRacks() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack2", + 2, "rack2", + 3, "rack3", + 4, "rack3", + 5, "rack2" + ); + + int numPartitions = 6; + int replicationFactor = 2; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + // Assert that each partition is assigned to the correct number of replicas + for (List<Integer> replicas : assignment.values()) { + assertEquals(replicationFactor, replicas.size()); + } + + // Get the replica distribution + ReplicaDistributions distribution = getReplicaDistribution(assignment, brokerRackMapping); + + // Verify that each partition is spread across exactly 2 unique racks + for (int partition = 0; partition <= 5; partition++) { + List<String> racks = distribution.getPartitionRacks().get(partition); + long distinctRackCount = racks.stream().distinct().count(); + assertEquals(2, distinctRackCount); + } + + } + + @Test + public void testSingleRack() { + Map<Integer, String> brokerRackMapping = Map.of( + 0, "rack1", + 1, "rack1", + 2, "rack1", + 3, "rack1", + 4, "rack1", + 5, "rack1" + ); + + int numPartitions = 6; + int replicationFactor = 3; + + Map<Integer, List<Integer>> assignment = + AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor); + + for (List<Integer> replicas : assignment.values()) { + assertEquals(replicationFactor, replicas.size()); + } + + ReplicaDistributions distribution = getReplicaDistribution(assignment, brokerRackMapping); + + for (int partition = 0; partition < numPartitions; partition++) { + List<String> racks = distribution.getPartitionRacks().get(partition); + long distinctRackCount = racks.stream().distinct().count(); + assertEquals(1, distinctRackCount); + } + + for (Integer broker : brokerRackMapping.keySet()) { + int leaderCount = distribution.getBrokerLeaderCount().getOrDefault(broker, 0); + assertEquals(1, leaderCount); + } Review Comment: inline variable and `forEach` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org