junrao commented on a change in pull request #10494:
URL: https://github.com/apache/kafka/pull/10494#discussion_r631962390



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
##########
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.metadata.OptionalStringComparator;
+import org.apache.kafka.metadata.UsableBroker;
+
+
+/**
+ * The striped replica placer.
+ *
+ *
+ * GOALS
+ * The design of this placer attempts to satisfy a few competing goals.  
Firstly, we want
+ * to spread the replicas as evenly as we can across racks.  In the simple 
case where
+ * broker racks have not been configured, this goal is a no-op, of course.  
But it is the
+ * highest priority goal in multi-rack clusters.
+ *
+ * Our second goal is to spread the replicas evenly across brokers.  Since we 
are placing
+ * multiple partitions, we try to avoid putting each partition on the same set 
of
+ * replicas, even if it does satisfy the rack placement goal.  However, we 
treat the rack
+ * placement goal as higher priority than this goal-- if you configure 10 
brokers in rack
+ * A and B, and 1 broker in rack C, you will end up with a lot of partitions 
on that one
+ * broker in rack C.  If you were to place a lot of partitions with 
replication factor 3,
+ * each partition would try to get a replica there.  In general racks are 
supposed to be
+ * about the same size -- if they aren't, this is a user error.
+ *
+ * Thirdly, we would prefer to place replicas on unfenced brokers, rather than 
on fenced
+ * brokers.
+ *
+ *
+ * CONSTRAINTS
+ * In addition to these goals, we have two constraints.  Unlike the goals, 
these are not
+ * optional -- they are mandatory.  Placement will fail if a constraint cannot 
be
+ * satisfied.  The first constraint is that we can't place more than one 
replica on the
+ * same broker.  This imposes an upper limit on replication factor-- for 
example, a 3-node
+ * cluster can't have any topics with replication factor 4.  This constraint 
comes from
+ * Kafka's internal design.
+ *
+ * The second constraint is that the leader of each partition must be an 
unfenced broker.
+ * This constraint is a bit arbitrary.  In theory, we could allow people to 
create
+ * new topics even if every broker were fenced.  However, this would be 
confusing for
+ * users.
+ *
+ *
+ * ALGORITHM
+ * The StripedReplicaPlacer constructor loads the broker data into rack 
objects.  Each
+ * rack object contains a sorted list of fenced brokers, and a separate sorted 
list of
+ * unfenced brokers.  The racks themselves are organized into a sorted list, 
stored inside
+ * the top-level RackList object.
+ *
+ * The general idea is that we place replicas on to racks in a round-robin 
fashion.  So if
+ * we had racks A, B, C, and D, and we were creating a new partition with 
replication
+ * factor 3, our first replica might come from A, our second from B, and our 
third from C.
+ * Of course our placement would not be very fair if we always started with 
rack A.
+ * Therefore, we generate a random starting offset when the RackList is 
created.  So one
+ * time we might go B, C, D.  Another time we might go C, D, A.  And so forth.
+ *
+ * Note that each partition we generate advances the starting offset by one.
+ * So in our 4-rack cluster, with 3 partitions, we might choose these racks:
+ *
+ * partition 1: A, B, C
+ * partition 2: B, C, A
+ * partition 3: C, A, B
+ *
+ * This is what generates the characteristic "striped" pattern of this placer.
+ *
+ * So far I haven't said anything about how we choose a replica from within a 
rack.  In
+ * fact, this is also done in a round-robin fashion.  So if rack A had replica 
A0, A1, A2,
+ * and A3, we might return A0 the first time, A1, the second, A2 the third, 
and so on.
+ * Just like with the racks, we add a random starting offset to mix things up 
a bit.
+ *
+ * So let's say you had a cluster with racks A, B, and C, and each rack had 3 
replicas,
+ * for 9 nodes in total.
+ * If all the offsets were 0, you'd get placements like this:
+ *
+ * partition 1: A0, B0, C0
+ * partition 2: B1, C1, A1
+ * partition 3: C2, A2, B2
+ *
+ * One additional complication with choosing a replica within a rack is that 
we want to
+ * choose the unfenced replicas first.  In a big cluster with lots of nodes 
available,
+ * we'd prefer not to place a new partition on a node that is fenced.  
Therefore, we
+ * actually maintain two lists, rather than the single list I described above.
+ * We only start using the fenced node list when the unfenced node list is 
totally
+ * exhausted.
+ *
+ * Furthermore, we cannot place the first replica (the leader) of a new 
partition on a
+ * fenced replica.  Therefore, we have some special logic to ensure that this 
doesn't
+ * happen.
+ */
+public class StripedReplicaPlacer implements ReplicaPlacer {
+    /**
+     * A list of brokers that we can iterate through.
+     */
+    static class BrokerList {
+        final static BrokerList EMPTY = new BrokerList();
+        private final List<Integer> brokers = new ArrayList<>(0);
+        private int epoch = 0;
+        private int index = 0;
+        private int offset = 0;

Review comment:
       Could we add a bit comment explaining epoch, index and offset?

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.controller.StripedReplicaPlacer.BrokerList;
+import org.apache.kafka.controller.StripedReplicaPlacer.RackList;
+import org.apache.kafka.metadata.UsableBroker;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
+@Timeout(value = 40)
+public class StripedReplicaPlacerTest {
+    /**
+     * Test that the BrokerList class works as expected.
+     */
+    @Test
+    public void testBrokerList() {
+        assertEquals(0, BrokerList.EMPTY.size());
+        assertEquals(-1, BrokerList.EMPTY.next(1));
+        BrokerList brokers = new BrokerList().add(0).add(1).add(2).add(3);
+        assertEquals(4, brokers.size());
+        assertEquals(0, brokers.next(0));
+        assertEquals(1, brokers.next(0));
+        assertEquals(2, brokers.next(0));
+        assertEquals(3, brokers.next(0));
+        assertEquals(-1, brokers.next(0));
+        assertEquals(-1, brokers.next(0));
+        assertEquals(1, brokers.next(1));
+        assertEquals(2, brokers.next(1));
+        assertEquals(3, brokers.next(1));
+        assertEquals(0, brokers.next(1));
+        assertEquals(-1, brokers.next(1));
+    }
+
+    /**
+     * Test that we perform striped replica placement as expected, and don't 
use the
+     * fenced replica if we don't have to.
+     */
+    @Test
+    public void testAvoidFencedReplicaIfPossibleOnSingleRack() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+            new UsableBroker(3, Optional.empty(), false),
+            new UsableBroker(1, Optional.empty(), true),
+            new UsableBroker(0, Optional.empty(), false),
+            new UsableBroker(4, Optional.empty(), false),
+            new UsableBroker(2, Optional.empty(), false)).iterator());
+        assertEquals(5, rackList.numTotalBrokers());
+        assertEquals(4, rackList.numUnfencedBrokers());
+        assertEquals(Collections.singletonList(Optional.empty()), 
rackList.rackNames());
+        assertThrows(InvalidReplicationFactorException.class, () -> 
rackList.place(0));
+        assertThrows(InvalidReplicationFactorException.class, () -> 
rackList.place(-1));
+        assertEquals(Arrays.asList(3, 4, 0, 2), rackList.place(4));
+        assertEquals(Arrays.asList(4, 0, 2, 3), rackList.place(4));
+        assertEquals(Arrays.asList(0, 2, 3, 4), rackList.place(4));
+        assertEquals(Arrays.asList(2, 3, 4, 0), rackList.place(4));
+        assertEquals(Arrays.asList(0, 4, 3, 2), rackList.place(4));
+    }
+
+    /**
+     * Test that we will place on the fenced replica if we need to.
+     */
+    @Test
+    public void testPlacementOnFencedReplicaOnSingleRack() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+            new UsableBroker(3, Optional.empty(), false),
+            new UsableBroker(1, Optional.empty(), true),
+            new UsableBroker(2, Optional.empty(), false)).iterator());
+        assertEquals(3, rackList.numTotalBrokers());
+        assertEquals(2, rackList.numUnfencedBrokers());
+        assertEquals(Collections.singletonList(Optional.empty()), 
rackList.rackNames());
+        assertEquals(Arrays.asList(3, 2, 1), rackList.place(3));
+        assertEquals(Arrays.asList(2, 3, 1), rackList.place(3));
+        assertEquals(Arrays.asList(3, 2, 1), rackList.place(3));

Review comment:
       Hmm, we should have shuffled the broker list here. Why is the assignment 
pattern repeating here?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
##########
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.metadata.OptionalStringComparator;
+import org.apache.kafka.metadata.UsableBroker;
+
+
+/**
+ * The striped replica placer.
+ *
+ *
+ * GOALS
+ * The design of this placer attempts to satisfy a few competing goals.  
Firstly, we want
+ * to spread the replicas as evenly as we can across racks.  In the simple 
case where
+ * broker racks have not been configured, this goal is a no-op, of course.  
But it is the
+ * highest priority goal in multi-rack clusters.
+ *
+ * Our second goal is to spread the replicas evenly across brokers.  Since we 
are placing
+ * multiple partitions, we try to avoid putting each partition on the same set 
of
+ * replicas, even if it does satisfy the rack placement goal.  However, we 
treat the rack
+ * placement goal as higher priority than this goal-- if you configure 10 
brokers in rack
+ * A and B, and 1 broker in rack C, you will end up with a lot of partitions 
on that one
+ * broker in rack C.  If you were to place a lot of partitions with 
replication factor 3,
+ * each partition would try to get a replica there.  In general racks are 
supposed to be
+ * about the same size -- if they aren't, this is a user error.
+ *
+ * Thirdly, we would prefer to place replicas on unfenced brokers, rather than 
on fenced
+ * brokers.
+ *
+ *
+ * CONSTRAINTS
+ * In addition to these goals, we have two constraints.  Unlike the goals, 
these are not
+ * optional -- they are mandatory.  Placement will fail if a constraint cannot 
be
+ * satisfied.  The first constraint is that we can't place more than one 
replica on the
+ * same broker.  This imposes an upper limit on replication factor-- for 
example, a 3-node
+ * cluster can't have any topics with replication factor 4.  This constraint 
comes from
+ * Kafka's internal design.
+ *
+ * The second constraint is that the leader of each partition must be an 
unfenced broker.
+ * This constraint is a bit arbitrary.  In theory, we could allow people to 
create
+ * new topics even if every broker were fenced.  However, this would be 
confusing for
+ * users.
+ *
+ *
+ * ALGORITHM
+ * The StripedReplicaPlacer constructor loads the broker data into rack 
objects.  Each
+ * rack object contains a sorted list of fenced brokers, and a separate sorted 
list of
+ * unfenced brokers.  The racks themselves are organized into a sorted list, 
stored inside
+ * the top-level RackList object.
+ *
+ * The general idea is that we place replicas on to racks in a round-robin 
fashion.  So if
+ * we had racks A, B, C, and D, and we were creating a new partition with 
replication
+ * factor 3, our first replica might come from A, our second from B, and our 
third from C.
+ * Of course our placement would not be very fair if we always started with 
rack A.
+ * Therefore, we generate a random starting offset when the RackList is 
created.  So one
+ * time we might go B, C, D.  Another time we might go C, D, A.  And so forth.
+ *
+ * Note that each partition we generate advances the starting offset by one.
+ * So in our 4-rack cluster, with 3 partitions, we might choose these racks:
+ *
+ * partition 1: A, B, C
+ * partition 2: B, C, A
+ * partition 3: C, A, B
+ *
+ * This is what generates the characteristic "striped" pattern of this placer.
+ *
+ * So far I haven't said anything about how we choose a replica from within a 
rack.  In
+ * fact, this is also done in a round-robin fashion.  So if rack A had replica 
A0, A1, A2,
+ * and A3, we might return A0 the first time, A1, the second, A2 the third, 
and so on.
+ * Just like with the racks, we add a random starting offset to mix things up 
a bit.
+ *
+ * So let's say you had a cluster with racks A, B, and C, and each rack had 3 
replicas,
+ * for 9 nodes in total.
+ * If all the offsets were 0, you'd get placements like this:
+ *
+ * partition 1: A0, B0, C0
+ * partition 2: B1, C1, A1
+ * partition 3: C2, A2, B2
+ *
+ * One additional complication with choosing a replica within a rack is that 
we want to
+ * choose the unfenced replicas first.  In a big cluster with lots of nodes 
available,
+ * we'd prefer not to place a new partition on a node that is fenced.  
Therefore, we
+ * actually maintain two lists, rather than the single list I described above.
+ * We only start using the fenced node list when the unfenced node list is 
totally
+ * exhausted.
+ *
+ * Furthermore, we cannot place the first replica (the leader) of a new 
partition on a
+ * fenced replica.  Therefore, we have some special logic to ensure that this 
doesn't
+ * happen.
+ */
+public class StripedReplicaPlacer implements ReplicaPlacer {
+    /**
+     * A list of brokers that we can iterate through.
+     */
+    static class BrokerList {
+        final static BrokerList EMPTY = new BrokerList();
+        private final List<Integer> brokers = new ArrayList<>(0);
+        private int epoch = 0;
+        private int index = 0;
+        private int offset = 0;
+
+        BrokerList add(int broker) {
+            this.brokers.add(broker);
+            return this;
+        }
+
+        /**
+         * Initialize this broker list by sorting it and randomizing the start 
offset.
+         *
+         * @param random    The random number generator.
+         */
+        void initialize(Random random) {
+            if (!brokers.isEmpty()) {
+                brokers.sort(Integer::compareTo);
+                this.offset = random.nextInt(brokers.size());
+            }
+        }
+
+        /**
+         * Randomly shuffle the brokers in this list.
+         */
+        void shuffle(Random random) {
+            Collections.shuffle(brokers, random);
+        }
+
+        /**
+         * @return          The number of brokers in this list.
+         */
+        int size() {
+            return brokers.size();
+        }
+
+        /**
+         * Get the next broker in this list, or -1 if there are no more 
elements to be
+         * returned.
+         *
+         * @param epoch     The current iteration epoch.
+         *
+         * @return          The broker ID, or -1 if there are no more brokers 
to be
+         *                  returned in this epoch.
+         */
+        int next(int epoch) {
+            if (brokers.size() == 0) return -1;
+            if (this.epoch != epoch) {
+                this.epoch = epoch;
+                this.index = 0;
+                this.offset = (offset + 1) % brokers.size();
+            }
+            if (index >= brokers.size()) return -1;
+            int broker = brokers.get((index + offset) % brokers.size());
+            index++;
+            return broker;
+        }
+    }
+
+    /**
+     * A rack in the cluster, which contains brokers.
+     */
+    static class Rack {
+        private final BrokerList fenced = new BrokerList();
+        private final BrokerList unfenced = new BrokerList();
+
+        /**
+         * Initialize this rack.
+         *
+         * @param random    The random number generator.
+         */
+        void initialize(Random random) {
+            fenced.initialize(random);
+            unfenced.initialize(random);
+        }
+
+        void shuffle(Random random) {
+            fenced.shuffle(random);
+            unfenced.shuffle(random);
+        }
+
+        BrokerList fenced() {
+            return fenced;
+        }
+
+        BrokerList unfenced() {
+            return unfenced;
+        }
+
+        /**
+         * Get the next unfenced broker in this rack, or -1 if there are no 
more brokers
+         * to be returned.
+         *
+         * @param epoch     The current iteration epoch.
+         *
+         * @return          The broker ID, or -1 if there are no more brokers 
to be
+         *                  returned in this epoch.
+         */
+        int nextUnfenced(int epoch) {
+            return unfenced.next(epoch);
+        }
+
+        /**
+         * Get the next broker in this rack, or -1 if there are no more 
brokers to be
+         * returned.
+         *
+         * @param epoch     The current iteration epoch.
+         *
+         * @return          The broker ID, or -1 if there are no more brokers 
to be
+         *                  returned in this epoch.
+         */
+        int next(int epoch) {
+            int result = unfenced.next(epoch);
+            if (result >= 0) return result;
+            return fenced.next(epoch);
+        }
+    }
+
+    /**
+     * A list of racks that we can iterate through.
+     */
+    static class RackList {
+        /**
+         * The random number generator.
+         */
+        private final Random random;
+
+        /**
+         * A map from rack names to the brokers contained within them.
+         */
+        private final Map<Optional<String>, Rack> racks = new HashMap<>();
+
+        /**
+         * The names of all the racks in the cluster.
+         *
+         * Racks which have at least one unfenced broker come first (in sorted 
order),
+         * followed by racks which have only fenced brokers (also in sorted 
order).
+         */
+        private final List<Optional<String>> rackNames = new ArrayList<>();
+
+        /**
+         * The total number of brokers in the cluster, both fenced and 
unfenced.
+         */
+        private final int numTotalBrokers;
+
+        /**
+         * The total number of unfenced brokers in the cluster.
+         */
+        private final int numUnfencedBrokers;
+
+        /**
+         * The iteration epoch.
+         */
+        private int epoch = 0;
+
+        /**
+         * The offset we use to determine which rack is returned first.
+         */
+        private int offset;
+
+        RackList(Random random, Iterator<UsableBroker> iterator) {
+            this.random = random;
+            int numTotalBrokersCount = 0, numUnfencedBrokersCount = 0;
+            while (iterator.hasNext()) {
+                UsableBroker broker = iterator.next();
+                Rack rack = racks.get(broker.rack());
+                if (rack == null) {
+                    rackNames.add(broker.rack());
+                    rack = new Rack();
+                    racks.put(broker.rack(), rack);
+                }
+                if (broker.fenced()) {
+                    rack.fenced().add(broker.id());
+                } else {
+                    numUnfencedBrokersCount++;
+                    rack.unfenced().add(broker.id());
+                }
+                numTotalBrokersCount++;
+            }
+            for (Rack rack : racks.values()) {
+                rack.initialize(random);
+            }
+            this.rackNames.sort(OptionalStringComparator.INSTANCE);
+            this.numTotalBrokers = numTotalBrokersCount;
+            this.numUnfencedBrokers = numUnfencedBrokersCount;
+            this.offset = rackNames.isEmpty() ? 0 : 
random.nextInt(rackNames.size());
+        }
+
+        int numTotalBrokers() {
+            return numTotalBrokers;
+        }
+
+        int numUnfencedBrokers() {
+            return numUnfencedBrokers;
+        }
+
+        // VisibleForTesting
+        List<Optional<String>> rackNames() {
+            return rackNames;
+        }
+
+        List<Integer> place(int replicationFactor) {
+            if (replicationFactor <= 0) {
+                throw new InvalidReplicationFactorException("Invalid 
replication factor " +
+                        replicationFactor + ": the replication factor must be 
positive.");
+            }
+            // If we have returned as many assignments as there are unfenced 
brokers in
+            // the cluster, shuffle the rack list and broker lists to try to 
avoid
+            // repeating the same assignments again.
+            if (epoch == numUnfencedBrokers) {

Review comment:
       Should this be `(epoch % numUnfencedBrokers) == 0`?

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.controller.StripedReplicaPlacer.BrokerList;
+import org.apache.kafka.controller.StripedReplicaPlacer.RackList;
+import org.apache.kafka.metadata.UsableBroker;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
+@Timeout(value = 40)
+public class StripedReplicaPlacerTest {
+    /**
+     * Test that the BrokerList class works as expected.
+     */
+    @Test
+    public void testBrokerList() {
+        assertEquals(0, BrokerList.EMPTY.size());
+        assertEquals(-1, BrokerList.EMPTY.next(1));
+        BrokerList brokers = new BrokerList().add(0).add(1).add(2).add(3);
+        assertEquals(4, brokers.size());
+        assertEquals(0, brokers.next(0));
+        assertEquals(1, brokers.next(0));
+        assertEquals(2, brokers.next(0));
+        assertEquals(3, brokers.next(0));
+        assertEquals(-1, brokers.next(0));
+        assertEquals(-1, brokers.next(0));
+        assertEquals(1, brokers.next(1));
+        assertEquals(2, brokers.next(1));
+        assertEquals(3, brokers.next(1));
+        assertEquals(0, brokers.next(1));
+        assertEquals(-1, brokers.next(1));
+    }
+
+    /**
+     * Test that we perform striped replica placement as expected, and don't 
use the
+     * fenced replica if we don't have to.
+     */
+    @Test
+    public void testAvoidFencedReplicaIfPossibleOnSingleRack() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+            new UsableBroker(3, Optional.empty(), false),
+            new UsableBroker(1, Optional.empty(), true),
+            new UsableBroker(0, Optional.empty(), false),
+            new UsableBroker(4, Optional.empty(), false),
+            new UsableBroker(2, Optional.empty(), false)).iterator());
+        assertEquals(5, rackList.numTotalBrokers());
+        assertEquals(4, rackList.numUnfencedBrokers());
+        assertEquals(Collections.singletonList(Optional.empty()), 
rackList.rackNames());
+        assertThrows(InvalidReplicationFactorException.class, () -> 
rackList.place(0));
+        assertThrows(InvalidReplicationFactorException.class, () -> 
rackList.place(-1));
+        assertEquals(Arrays.asList(3, 4, 0, 2), rackList.place(4));
+        assertEquals(Arrays.asList(4, 0, 2, 3), rackList.place(4));
+        assertEquals(Arrays.asList(0, 2, 3, 4), rackList.place(4));
+        assertEquals(Arrays.asList(2, 3, 4, 0), rackList.place(4));
+        assertEquals(Arrays.asList(0, 4, 3, 2), rackList.place(4));
+    }
+
+    /**
+     * Test that we will place on the fenced replica if we need to.
+     */
+    @Test
+    public void testPlacementOnFencedReplicaOnSingleRack() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+            new UsableBroker(3, Optional.empty(), false),
+            new UsableBroker(1, Optional.empty(), true),
+            new UsableBroker(2, Optional.empty(), false)).iterator());
+        assertEquals(3, rackList.numTotalBrokers());
+        assertEquals(2, rackList.numUnfencedBrokers());
+        assertEquals(Collections.singletonList(Optional.empty()), 
rackList.rackNames());
+        assertEquals(Arrays.asList(3, 2, 1), rackList.place(3));
+        assertEquals(Arrays.asList(2, 3, 1), rackList.place(3));
+        assertEquals(Arrays.asList(3, 2, 1), rackList.place(3));
+        assertEquals(Arrays.asList(2, 3, 1), rackList.place(3));
+    }
+
+    @Test
+    public void testRackListWithMultipleRacks() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+            new UsableBroker(11, Optional.of("1"), false),
+            new UsableBroker(10, Optional.of("1"), false),
+            new UsableBroker(30, Optional.of("3"), false),
+            new UsableBroker(31, Optional.of("3"), false),
+            new UsableBroker(21, Optional.of("2"), false),
+            new UsableBroker(20, Optional.of("2"), true)).iterator());
+        assertEquals(6, rackList.numTotalBrokers());
+        assertEquals(5, rackList.numUnfencedBrokers());
+        assertEquals(Arrays.asList(Optional.of("1"), Optional.of("2"), 
Optional.of("3")), rackList.rackNames());
+        assertEquals(Arrays.asList(11, 21, 31, 10), rackList.place(4));
+        assertEquals(Arrays.asList(21, 30, 10, 20), rackList.place(4));
+        assertEquals(Arrays.asList(31, 11, 21, 30), rackList.place(4));
+    }
+
+    @Test
+    public void testRackListWithInvalidRacks() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+            new UsableBroker(11, Optional.of("1"), false),
+            new UsableBroker(10, Optional.of("1"), false),
+            new UsableBroker(30, Optional.of("3"), true),
+            new UsableBroker(31, Optional.of("3"), true),
+            new UsableBroker(20, Optional.of("2"), true),
+            new UsableBroker(21, Optional.of("2"), true),
+            new UsableBroker(41, Optional.of("4"), false),
+            new UsableBroker(40, Optional.of("4"), true)).iterator());
+        assertEquals(8, rackList.numTotalBrokers());
+        assertEquals(3, rackList.numUnfencedBrokers());
+        assertEquals(Arrays.asList(Optional.of("1"),
+            Optional.of("2"),
+            Optional.of("3"),
+            Optional.of("4")), rackList.rackNames());
+        assertEquals(Arrays.asList(41, 11, 21, 30), rackList.place(4));

Review comment:
       Why didn't the leader start from rack 1, which is the fist in the rack 
list? Also, why didn't rack 2 start with 20, which sorts first during 
initialization?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
##########
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.metadata.OptionalStringComparator;
+import org.apache.kafka.metadata.UsableBroker;
+
+
+/**
+ * The striped replica placer.
+ *
+ *
+ * GOALS
+ * The design of this placer attempts to satisfy a few competing goals.  
Firstly, we want
+ * to spread the replicas as evenly as we can across racks.  In the simple 
case where
+ * broker racks have not been configured, this goal is a no-op, of course.  
But it is the
+ * highest priority goal in multi-rack clusters.
+ *
+ * Our second goal is to spread the replicas evenly across brokers.  Since we 
are placing
+ * multiple partitions, we try to avoid putting each partition on the same set 
of
+ * replicas, even if it does satisfy the rack placement goal.  However, we 
treat the rack
+ * placement goal as higher priority than this goal-- if you configure 10 
brokers in rack
+ * A and B, and 1 broker in rack C, you will end up with a lot of partitions 
on that one
+ * broker in rack C.  If you were to place a lot of partitions with 
replication factor 3,
+ * each partition would try to get a replica there.  In general racks are 
supposed to be
+ * about the same size -- if they aren't, this is a user error.
+ *
+ * Thirdly, we would prefer to place replicas on unfenced brokers, rather than 
on fenced
+ * brokers.
+ *
+ *
+ * CONSTRAINTS
+ * In addition to these goals, we have two constraints.  Unlike the goals, 
these are not
+ * optional -- they are mandatory.  Placement will fail if a constraint cannot 
be
+ * satisfied.  The first constraint is that we can't place more than one 
replica on the
+ * same broker.  This imposes an upper limit on replication factor-- for 
example, a 3-node
+ * cluster can't have any topics with replication factor 4.  This constraint 
comes from
+ * Kafka's internal design.
+ *
+ * The second constraint is that the leader of each partition must be an 
unfenced broker.
+ * This constraint is a bit arbitrary.  In theory, we could allow people to 
create
+ * new topics even if every broker were fenced.  However, this would be 
confusing for
+ * users.
+ *
+ *
+ * ALGORITHM
+ * The StripedReplicaPlacer constructor loads the broker data into rack 
objects.  Each
+ * rack object contains a sorted list of fenced brokers, and a separate sorted 
list of
+ * unfenced brokers.  The racks themselves are organized into a sorted list, 
stored inside
+ * the top-level RackList object.
+ *
+ * The general idea is that we place replicas on to racks in a round-robin 
fashion.  So if
+ * we had racks A, B, C, and D, and we were creating a new partition with 
replication
+ * factor 3, our first replica might come from A, our second from B, and our 
third from C.
+ * Of course our placement would not be very fair if we always started with 
rack A.
+ * Therefore, we generate a random starting offset when the RackList is 
created.  So one
+ * time we might go B, C, D.  Another time we might go C, D, A.  And so forth.
+ *
+ * Note that each partition we generate advances the starting offset by one.
+ * So in our 4-rack cluster, with 3 partitions, we might choose these racks:
+ *
+ * partition 1: A, B, C
+ * partition 2: B, C, A
+ * partition 3: C, A, B
+ *
+ * This is what generates the characteristic "striped" pattern of this placer.
+ *
+ * So far I haven't said anything about how we choose a replica from within a 
rack.  In
+ * fact, this is also done in a round-robin fashion.  So if rack A had replica 
A0, A1, A2,
+ * and A3, we might return A0 the first time, A1, the second, A2 the third, 
and so on.
+ * Just like with the racks, we add a random starting offset to mix things up 
a bit.
+ *
+ * So let's say you had a cluster with racks A, B, and C, and each rack had 3 
replicas,
+ * for 9 nodes in total.
+ * If all the offsets were 0, you'd get placements like this:
+ *
+ * partition 1: A0, B0, C0
+ * partition 2: B1, C1, A1
+ * partition 3: C2, A2, B2
+ *
+ * One additional complication with choosing a replica within a rack is that 
we want to
+ * choose the unfenced replicas first.  In a big cluster with lots of nodes 
available,
+ * we'd prefer not to place a new partition on a node that is fenced.  
Therefore, we
+ * actually maintain two lists, rather than the single list I described above.
+ * We only start using the fenced node list when the unfenced node list is 
totally
+ * exhausted.
+ *
+ * Furthermore, we cannot place the first replica (the leader) of a new 
partition on a
+ * fenced replica.  Therefore, we have some special logic to ensure that this 
doesn't
+ * happen.
+ */
+public class StripedReplicaPlacer implements ReplicaPlacer {
+    /**
+     * A list of brokers that we can iterate through.
+     */
+    static class BrokerList {
+        final static BrokerList EMPTY = new BrokerList();
+        private final List<Integer> brokers = new ArrayList<>(0);
+        private int epoch = 0;
+        private int index = 0;
+        private int offset = 0;
+
+        BrokerList add(int broker) {
+            this.brokers.add(broker);
+            return this;
+        }
+
+        /**
+         * Initialize this broker list by sorting it and randomizing the start 
offset.
+         *
+         * @param random    The random number generator.
+         */
+        void initialize(Random random) {
+            if (!brokers.isEmpty()) {
+                brokers.sort(Integer::compareTo);
+                this.offset = random.nextInt(brokers.size());
+            }
+        }
+
+        /**
+         * Randomly shuffle the brokers in this list.
+         */
+        void shuffle(Random random) {
+            Collections.shuffle(brokers, random);

Review comment:
       Should we reset the offset here too?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
##########
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.metadata.OptionalStringComparator;
+import org.apache.kafka.metadata.UsableBroker;
+
+
+/**
+ * The striped replica placer.
+ *
+ *
+ * GOALS
+ * The design of this placer attempts to satisfy a few competing goals.  
Firstly, we want
+ * to spread the replicas as evenly as we can across racks.  In the simple 
case where
+ * broker racks have not been configured, this goal is a no-op, of course.  
But it is the
+ * highest priority goal in multi-rack clusters.
+ *
+ * Our second goal is to spread the replicas evenly across brokers.  Since we 
are placing
+ * multiple partitions, we try to avoid putting each partition on the same set 
of
+ * replicas, even if it does satisfy the rack placement goal.  However, we 
treat the rack
+ * placement goal as higher priority than this goal-- if you configure 10 
brokers in rack
+ * A and B, and 1 broker in rack C, you will end up with a lot of partitions 
on that one
+ * broker in rack C.  If you were to place a lot of partitions with 
replication factor 3,
+ * each partition would try to get a replica there.  In general racks are 
supposed to be
+ * about the same size -- if they aren't, this is a user error.
+ *
+ * Thirdly, we would prefer to place replicas on unfenced brokers, rather than 
on fenced

Review comment:
       It would be useful to document another goal: for partitions with the 
same 1st replica, we want to distribute the second replica for those partitions 
evenly. This way, if the first broker fails, the new leaders will be evenly 
distributed among the surviving brokers. The algorithm achieves that by forcing 
a shuffle when the partition index is a multiple of the number of brokers.

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.controller.StripedReplicaPlacer.BrokerList;
+import org.apache.kafka.controller.StripedReplicaPlacer.RackList;
+import org.apache.kafka.metadata.UsableBroker;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
+@Timeout(value = 40)
+public class StripedReplicaPlacerTest {
+    /**
+     * Test that the BrokerList class works as expected.
+     */
+    @Test
+    public void testBrokerList() {
+        assertEquals(0, BrokerList.EMPTY.size());
+        assertEquals(-1, BrokerList.EMPTY.next(1));
+        BrokerList brokers = new BrokerList().add(0).add(1).add(2).add(3);
+        assertEquals(4, brokers.size());
+        assertEquals(0, brokers.next(0));
+        assertEquals(1, brokers.next(0));
+        assertEquals(2, brokers.next(0));
+        assertEquals(3, brokers.next(0));
+        assertEquals(-1, brokers.next(0));
+        assertEquals(-1, brokers.next(0));
+        assertEquals(1, brokers.next(1));
+        assertEquals(2, brokers.next(1));
+        assertEquals(3, brokers.next(1));
+        assertEquals(0, brokers.next(1));
+        assertEquals(-1, brokers.next(1));
+    }
+
+    /**
+     * Test that we perform striped replica placement as expected, and don't 
use the
+     * fenced replica if we don't have to.
+     */
+    @Test
+    public void testAvoidFencedReplicaIfPossibleOnSingleRack() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+            new UsableBroker(3, Optional.empty(), false),
+            new UsableBroker(1, Optional.empty(), true),
+            new UsableBroker(0, Optional.empty(), false),
+            new UsableBroker(4, Optional.empty(), false),
+            new UsableBroker(2, Optional.empty(), false)).iterator());
+        assertEquals(5, rackList.numTotalBrokers());
+        assertEquals(4, rackList.numUnfencedBrokers());
+        assertEquals(Collections.singletonList(Optional.empty()), 
rackList.rackNames());
+        assertThrows(InvalidReplicationFactorException.class, () -> 
rackList.place(0));
+        assertThrows(InvalidReplicationFactorException.class, () -> 
rackList.place(-1));
+        assertEquals(Arrays.asList(3, 4, 0, 2), rackList.place(4));
+        assertEquals(Arrays.asList(4, 0, 2, 3), rackList.place(4));
+        assertEquals(Arrays.asList(0, 2, 3, 4), rackList.place(4));
+        assertEquals(Arrays.asList(2, 3, 4, 0), rackList.place(4));
+        assertEquals(Arrays.asList(0, 4, 3, 2), rackList.place(4));
+    }
+
+    /**
+     * Test that we will place on the fenced replica if we need to.
+     */
+    @Test
+    public void testPlacementOnFencedReplicaOnSingleRack() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+            new UsableBroker(3, Optional.empty(), false),
+            new UsableBroker(1, Optional.empty(), true),
+            new UsableBroker(2, Optional.empty(), false)).iterator());
+        assertEquals(3, rackList.numTotalBrokers());
+        assertEquals(2, rackList.numUnfencedBrokers());
+        assertEquals(Collections.singletonList(Optional.empty()), 
rackList.rackNames());
+        assertEquals(Arrays.asList(3, 2, 1), rackList.place(3));
+        assertEquals(Arrays.asList(2, 3, 1), rackList.place(3));
+        assertEquals(Arrays.asList(3, 2, 1), rackList.place(3));
+        assertEquals(Arrays.asList(2, 3, 1), rackList.place(3));
+    }
+
+    @Test
+    public void testRackListWithMultipleRacks() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+            new UsableBroker(11, Optional.of("1"), false),
+            new UsableBroker(10, Optional.of("1"), false),
+            new UsableBroker(30, Optional.of("3"), false),
+            new UsableBroker(31, Optional.of("3"), false),
+            new UsableBroker(21, Optional.of("2"), false),
+            new UsableBroker(20, Optional.of("2"), true)).iterator());
+        assertEquals(6, rackList.numTotalBrokers());
+        assertEquals(5, rackList.numUnfencedBrokers());
+        assertEquals(Arrays.asList(Optional.of("1"), Optional.of("2"), 
Optional.of("3")), rackList.rackNames());
+        assertEquals(Arrays.asList(11, 21, 31, 10), rackList.place(4));
+        assertEquals(Arrays.asList(21, 30, 10, 20), rackList.place(4));
+        assertEquals(Arrays.asList(31, 11, 21, 30), rackList.place(4));
+    }
+
+    @Test
+    public void testRackListWithInvalidRacks() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+            new UsableBroker(11, Optional.of("1"), false),
+            new UsableBroker(10, Optional.of("1"), false),
+            new UsableBroker(30, Optional.of("3"), true),
+            new UsableBroker(31, Optional.of("3"), true),
+            new UsableBroker(20, Optional.of("2"), true),
+            new UsableBroker(21, Optional.of("2"), true),
+            new UsableBroker(41, Optional.of("4"), false),
+            new UsableBroker(40, Optional.of("4"), true)).iterator());
+        assertEquals(8, rackList.numTotalBrokers());
+        assertEquals(3, rackList.numUnfencedBrokers());
+        assertEquals(Arrays.asList(Optional.of("1"),
+            Optional.of("2"),
+            Optional.of("3"),
+            Optional.of("4")), rackList.rackNames());
+        assertEquals(Arrays.asList(41, 11, 21, 30), rackList.place(4));
+        assertEquals(Arrays.asList(10, 20, 31, 41), rackList.place(4));
+        assertEquals(Arrays.asList(41, 21, 30, 11), rackList.place(4));
+    }
+
+    @Test
+    public void testAllBrokersFenced() {
+        MockRandom random = new MockRandom();
+        StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
+        assertEquals("All brokers are currently fenced.",
+            assertThrows(InvalidReplicationFactorException.class,
+                () -> placer.place(0, 1, (short) 1, Arrays.asList(
+                    new UsableBroker(11, Optional.of("1"), true),
+                    new UsableBroker(10, Optional.of("1"), 
true)).iterator())).getMessage());
+    }
+
+    @Test
+    public void testNotEnoughBrokers() {
+        MockRandom random = new MockRandom();
+        StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
+        assertEquals("The target replication factor of 3 cannot be reached 
because only " +
+            "2 broker(s) are registered.",
+            assertThrows(InvalidReplicationFactorException.class,
+                () -> placer.place(0, 1, (short) 3, Arrays.asList(
+                    new UsableBroker(11, Optional.of("1"), false),
+                    new UsableBroker(10, Optional.of("1"), 
false)).iterator())).getMessage());
+    }
+
+    @Test
+    public void testSuccessfulPlacement() {

Review comment:
       Could we add a test that verifies that not only the first replica is 
distributed evenly, but for partitions with the same first replica, their 
second replicas are also distributed evenly?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to