dajac commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r889876056
##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -682,25 +683,35 @@ private ApiError createTopic(CreatableTopic topic,
short replicationFactor = topic.replicationFactor() == -1 ?
defaultReplicationFactor : topic.replicationFactor();
try {
- List<List<Integer>> replicas =
clusterControl.replicaPlacer().place(new PlacementSpec(
+ List<List<Integer>> partitions =
clusterControl.replicaPlacer().place(new PlacementSpec(
0,
numPartitions,
replicationFactor
), clusterDescriber);
- for (int partitionId = 0; partitionId < replicas.size();
partitionId++) {
- int[] r = Replicas.toArray(replicas.get(partitionId));
+ for (int partitionId = 0; partitionId < partitions.size();
partitionId++) {
+ List<Integer> replicas = partitions.get(partitionId);
+ List<Integer> isr = replicas.stream().
+
filter(clusterControl::active).collect(Collectors.toList());
+ // We need to have at least one replica in the ISR.
+ if (isr.isEmpty()) isr.add(replicas.get(0));
Review Comment:
That's a good point. Thinking more about this, I think that we should just
fail the creation in this case. At the moment, the placer fails the assignment
if all the brokers are fenced so extending this to include
in-controlled-shutdown brokers seems appropriate.
I still handle the case here. However, I think that we should update the
placer to directly handle this. I would like to do this as a follow-up if you
don't mind. That's a big change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]