cmccabe commented on a change in pull request #10494: URL: https://github.com/apache/kafka/pull/10494#discussion_r633926611
########## File path: metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java ########## @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; + +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.metadata.OptionalStringComparator; +import org.apache.kafka.metadata.UsableBroker; + + +/** + * The striped replica placer. + * + * + * GOALS + * The design of this placer attempts to satisfy a few competing goals. Firstly, we want + * to spread the replicas as evenly as we can across racks. In the simple case where + * broker racks have not been configured, this goal is a no-op, of course. But it is the + * highest priority goal in multi-rack clusters. + * + * Our second goal is to spread the replicas evenly across brokers. Since we are placing + * multiple partitions, we try to avoid putting each partition on the same set of + * replicas, even if it does satisfy the rack placement goal. However, we treat the rack + * placement goal as higher priority than this goal-- if you configure 10 brokers in rack + * A and B, and 1 broker in rack C, you will end up with a lot of partitions on that one + * broker in rack C. If you were to place a lot of partitions with replication factor 3, + * each partition would try to get a replica there. In general racks are supposed to be + * about the same size -- if they aren't, this is a user error. + * + * Thirdly, we would prefer to place replicas on unfenced brokers, rather than on fenced + * brokers. + * + * + * CONSTRAINTS + * In addition to these goals, we have two constraints. Unlike the goals, these are not + * optional -- they are mandatory. Placement will fail if a constraint cannot be + * satisfied. The first constraint is that we can't place more than one replica on the + * same broker. This imposes an upper limit on replication factor-- for example, a 3-node + * cluster can't have any topics with replication factor 4. This constraint comes from + * Kafka's internal design. + * + * The second constraint is that the leader of each partition must be an unfenced broker. + * This constraint is a bit arbitrary. In theory, we could allow people to create + * new topics even if every broker were fenced. However, this would be confusing for + * users. + * + * + * ALGORITHM + * The StripedReplicaPlacer constructor loads the broker data into rack objects. Each + * rack object contains a sorted list of fenced brokers, and a separate sorted list of + * unfenced brokers. The racks themselves are organized into a sorted list, stored inside + * the top-level RackList object. + * + * The general idea is that we place replicas on to racks in a round-robin fashion. So if + * we had racks A, B, C, and D, and we were creating a new partition with replication + * factor 3, our first replica might come from A, our second from B, and our third from C. + * Of course our placement would not be very fair if we always started with rack A. + * Therefore, we generate a random starting offset when the RackList is created. So one + * time we might go B, C, D. Another time we might go C, D, A. And so forth. + * + * Note that each partition we generate advances the starting offset by one. + * So in our 4-rack cluster, with 3 partitions, we might choose these racks: + * + * partition 1: A, B, C + * partition 2: B, C, A + * partition 3: C, A, B + * + * This is what generates the characteristic "striped" pattern of this placer. + * + * So far I haven't said anything about how we choose a replica from within a rack. In + * fact, this is also done in a round-robin fashion. So if rack A had replica A0, A1, A2, + * and A3, we might return A0 the first time, A1, the second, A2 the third, and so on. + * Just like with the racks, we add a random starting offset to mix things up a bit. + * + * So let's say you had a cluster with racks A, B, and C, and each rack had 3 replicas, + * for 9 nodes in total. + * If all the offsets were 0, you'd get placements like this: + * + * partition 1: A0, B0, C0 + * partition 2: B1, C1, A1 + * partition 3: C2, A2, B2 + * + * One additional complication with choosing a replica within a rack is that we want to + * choose the unfenced replicas first. In a big cluster with lots of nodes available, + * we'd prefer not to place a new partition on a node that is fenced. Therefore, we + * actually maintain two lists, rather than the single list I described above. + * We only start using the fenced node list when the unfenced node list is totally + * exhausted. + * + * Furthermore, we cannot place the first replica (the leader) of a new partition on a + * fenced replica. Therefore, we have some special logic to ensure that this doesn't + * happen. + */ +public class StripedReplicaPlacer implements ReplicaPlacer { + /** + * A list of brokers that we can iterate through. + */ + static class BrokerList { + final static BrokerList EMPTY = new BrokerList(); + private final List<Integer> brokers = new ArrayList<>(0); + private int epoch = 0; + private int index = 0; + private int offset = 0; + + BrokerList add(int broker) { + this.brokers.add(broker); + return this; + } + + /** + * Initialize this broker list by sorting it and randomizing the start offset. + * + * @param random The random number generator. + */ + void initialize(Random random) { + if (!brokers.isEmpty()) { + brokers.sort(Integer::compareTo); + this.offset = random.nextInt(brokers.size()); + } + } + + /** + * Randomly shuffle the brokers in this list. + */ + void shuffle(Random random) { + Collections.shuffle(brokers, random); Review comment: I guess the thinking here is that resetting the offset should not be necessary since what is in the slots will have changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org