mumrah commented on code in PR #12033:
URL: https://github.com/apache/kafka/pull/12033#discussion_r851455763
##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -132,6 +135,102 @@
* of each partition, as well as administrative tasks like creating or
deleting topics.
*/
public class ReplicationControlManager {
+ static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000;
+
+ static class Builder {
+ private SnapshotRegistry snapshotRegistry = null;
+ private LogContext logContext = null;
+ private short defaultReplicationFactor = (short) 3;
+ private int defaultNumPartitions = 1;
+ private int maxElectionsPerImbalance = MAX_ELECTIONS_PER_IMBALANCE;
+ private boolean isLeaderRecoverySupported = true;
+ private ConfigurationControlManager configurationControl = null;
+ private ClusterControlManager clusterControl = null;
+ private ControllerMetrics controllerMetrics = null;
+ private Optional<CreateTopicPolicy> createTopicPolicy =
Optional.empty();
+
+ Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+ this.snapshotRegistry = snapshotRegistry;
+ return this;
+ }
+
+ Builder setLogContext(LogContext logContext) {
+ this.logContext = logContext;
+ return this;
+ }
+
+ Builder setDefaultReplicationFactor(short defaultReplicationFactor) {
+ this.defaultReplicationFactor = defaultReplicationFactor;
+ return this;
+ }
+
+ Builder setDefaultNumPartitions(int defaultNumPartitions) {
+ this.defaultNumPartitions = defaultNumPartitions;
+ return this;
+ }
+
+ Builder setMaxElectionsPerImbalance(int maxElectionsPerImbalance) {
+ this.maxElectionsPerImbalance = maxElectionsPerImbalance;
+ return this;
+ }
+
+ Builder setIsLeaderRecoverySupported(boolean
isLeaderRecoverySupported) {
+ this.isLeaderRecoverySupported = isLeaderRecoverySupported;
+ return this;
+ }
+
+ Builder setConfigurationControl(ConfigurationControlManager
configurationControl) {
+ this.configurationControl = configurationControl;
+ return this;
+ }
+
+ Builder setClusterControl(ClusterControlManager clusterControl) {
+ this.clusterControl = clusterControl;
+ return this;
+ }
+
+ Builder setControllerMetrics(ControllerMetrics controllerMetrics) {
+ this.controllerMetrics = controllerMetrics;
+ return this;
+ }
+
+ Builder setCreateTopicPolicy(Optional<CreateTopicPolicy>
createTopicPolicy) {
+ this.createTopicPolicy = createTopicPolicy;
+ return this;
+ }
+
+ ReplicationControlManager build() {
+ if (configurationControl == null) {
+ throw new RuntimeException("You must specify
configurationControl.");
+ }
+ if (clusterControl == null) {
+ throw new RuntimeException("You must specify clusterControl.");
+ }
+ if (controllerMetrics == null) {
+ throw new RuntimeException("You must specify
controllerMetrics.");
+ }
+ if (logContext == null) logContext = new LogContext();
+ if (snapshotRegistry == null) snapshotRegistry =
configurationControl.snapshotRegistry();
+ return new ReplicationControlManager(snapshotRegistry,
+ logContext,
+ defaultReplicationFactor,
+ defaultNumPartitions,
+ maxElectionsPerImbalance,
+ isLeaderRecoverySupported,
+ configurationControl,
+ clusterControl,
+ controllerMetrics,
+ createTopicPolicy);
+ }
+ }
+
+ class KRaftClusterDescriber implements ClusterDescriber {
+ @Override
+ public Iterator<UsableBroker> usableBrokers() {
+ return clusterControl.usableBrokers();
+ }
+ }
Review Comment:
Since ClusterDescriber is a SAM, we can just pass
`clusterControl::usableBrokers` when we call ReplicaPlacer#place
##########
metadata/src/main/java/org/apache/kafka/metadata/placement/PlacementSpec.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.placement;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Objects;
+
+
+/**
+ * Specifies a replica placement that we want to make.
+ */
[email protected]
Review Comment:
Should we include this hint on more things in `metadata` package? Is this
intended as a signal to implementers of other components within Kafka (like in
`core` package)?
##########
metadata/src/main/java/org/apache/kafka/metadata/placement/ClusterDescriber.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.placement;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Iterator;
+
+
+/**
+ * Can describe a cluster to a ReplicaPlacer.
+ */
[email protected]
+public interface ClusterDescriber {
Review Comment:
If we go with my above suggestion, we should mark this as a
`@FunctionalInterface` as well
##########
core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala:
##########
@@ -65,22 +108,8 @@ class ControllerConfigurationValidator extends
ConfigurationValidator {
nullTopicConfigs.mkString(","))
}
LogConfig.validate(properties)
- case BROKER =>
- if (resource.name().nonEmpty) {
- val brokerId = try {
- Integer.valueOf(resource.name())
- } catch {
- case _: NumberFormatException =>
- throw new InvalidRequestException("Unable to parse broker name
as a base 10 number.")
- }
- if (brokerId < 0) {
- throw new InvalidRequestException("Invalid negative broker ID.")
- }
- }
- case _ =>
- // Note: we should never handle BROKER_LOGGER resources here, since
changes to
- // those resources are not persisted in the metadata.
- throw new InvalidRequestException(s"Unknown resource type
${resource.`type`}")
+ case BROKER => validateBrokerName(resource.name())
+ case _ => throwExceptionForUnknownResourceType(resource)
}
}
}
Review Comment:
nit: can we fix this missing newline while we're editing this file?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]