jsancio commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r889716710
##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -444,6 +481,46 @@ public void testCreateTopics() throws Exception {
ctx.replicationControl.iterator(Long.MAX_VALUE));
}
+ @Test
+ public void testCreateTopicsInvariants() throws Exception {
+ ReplicationControlTestContext ctx = new
ReplicationControlTestContext();
+ ReplicationControlManager replicationControl = ctx.replicationControl;
+
+ CreateTopicsRequestData request = new CreateTopicsRequestData();
+ request.topics().add(new CreatableTopic().setName("foo").
+ setNumPartitions(-1).setReplicationFactor((short) -1));
+
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1);
+ ctx.inControlledShutdownBrokers(1);
+
+ ControllerResult<CreateTopicsResponseData> result =
+ replicationControl.createTopics(request,
Collections.singleton("foo"));
+
+ CreateTopicsResponseData expectedResponse = new
CreateTopicsResponseData();
+ expectedResponse.topics().add(new
CreatableTopicResult().setName("foo").
+ setNumPartitions(1).setReplicationFactor((short) 3).
+ setErrorMessage(null).setErrorCode((short) 0).
+ setTopicId(result.response().topics().find("foo").topicId()));
+ assertEquals(expectedResponse, result.response());
+
+ ctx.replay(result.records());
+
+ // Broker 2 cannot be in the ISR because it is fenced and broker 1
+ // cannot be in the ISR because it is in controlled shutdown.
Review Comment:
Can we add a test that confirms automatic replica assignment when all of the
brokers are fenced and/or shutting down?
##########
metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java:
##########
@@ -213,12 +230,29 @@ public String toString() {
bld.append("}");
bld.append(", rack=").append(rack);
bld.append(", fenced=").append(fenced);
+ bld.append(", inControlledShutdown=").append(inControlledShutdown);
bld.append(")");
return bld.toString();
}
public BrokerRegistration cloneWithFencing(boolean fencing) {
return new BrokerRegistration(id, epoch, incarnationId, listeners,
- supportedFeatures, rack, fencing);
+ supportedFeatures, rack, fencing, inControlledShutdown);
+ }
Review Comment:
What do you think about removing this method and replacing all of the calls
with `cloneWith`.
##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -491,6 +540,18 @@ public boolean unfenced(int brokerId) {
return !registration.fenced();
}
+ public boolean inControlledShutdown(int brokerId) {
Review Comment:
Let's add a Java doc explaining that a broker that doesn't exist is not
shutting down. It looks like this is only used by tests. Should we document
that to avoid using this in`src/main`? Since this is only used in tests how
about this signature instead `Optional<Boolean> inControlledShutdown(int)`.
##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -132,15 +143,27 @@ ClusterControlManager build() {
replicaPlacer = new StripedReplicaPlacer(new Random());
}
if (controllerMetrics == null) {
- throw new RuntimeException("You must specify
controllerMetrics");
+ throw new RuntimeException("You must specify
ControllerMetrics");
+ }
+ if (featureControl == null) {
+ featureControl = new FeatureControlManager.Builder().
+ setLogContext(logContext).
+ setSnapshotRegistry(snapshotRegistry).
+ setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
+ QuorumFeatures.defaultFeatureMap(),
+ singletonList(0))).
+ setMetadataVersion(MetadataVersion.latest()).
+ build();
Review Comment:
Should `build` throw an `IllegalStateException` instead? I am worried that
this runs the risk of having multiple `FeatureControlManager` in a controller.
This feature control manater won't get updated when the metadata version
changes.
##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -410,15 +450,22 @@ public void replay(BrokerRegistrationChangeRecord record)
{
throw new RuntimeException(String.format("Unable to replay %s:
unknown " +
"value for fenced field: %d", record.toString(),
record.fenced()));
}
+ Optional<BrokerRegistrationInControlledShutdownChange>
inControlledShutdownChange =
+
BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown());
+ if (!inControlledShutdownChange.isPresent()) {
+ throw new RuntimeException(String.format("Unable to replay %s:
unknown " +
+ "value for inControlledShutdown field: %d", record.toString(),
record.inControlledShutdown()));
Review Comment:
How about `IllegalStateException`?
`toString()` in `record.toString()` shouldn't be needed. That is implicit
since format string uses `%s`.
##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -682,25 +683,35 @@ private ApiError createTopic(CreatableTopic topic,
short replicationFactor = topic.replicationFactor() == -1 ?
defaultReplicationFactor : topic.replicationFactor();
try {
- List<List<Integer>> replicas =
clusterControl.replicaPlacer().place(new PlacementSpec(
+ List<List<Integer>> partitions =
clusterControl.replicaPlacer().place(new PlacementSpec(
0,
numPartitions,
replicationFactor
), clusterDescriber);
- for (int partitionId = 0; partitionId < replicas.size();
partitionId++) {
- int[] r = Replicas.toArray(replicas.get(partitionId));
+ for (int partitionId = 0; partitionId < partitions.size();
partitionId++) {
+ List<Integer> replicas = partitions.get(partitionId);
+ List<Integer> isr = replicas.stream().
+
filter(clusterControl::active).collect(Collectors.toList());
+ // We need to have at least one replica in the ISR.
+ if (isr.isEmpty()) isr.add(replicas.get(0));
Review Comment:
Since all of the replicas are inactive, why not add them all to the ISR? The
controller doesn't know which replica will become online first. With this
implementation the controller is saying that the first replica needs to come
online first. I think this is unnecessarily strict.
This comment also applies to the create partition RPC.
##########
metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java:
##########
@@ -102,10 +104,34 @@ public void replay(UnfenceBrokerRecord record) {
public void replay(BrokerRegistrationChangeRecord record) {
BrokerRegistration broker =
getBrokerOrThrow(record.brokerId(), record.brokerEpoch(),
"change");
- if (record.fenced() < 0) {
- changedBrokers.put(record.brokerId(),
Optional.of(broker.cloneWithFencing(false)));
- } else if (record.fenced() > 0) {
- changedBrokers.put(record.brokerId(),
Optional.of(broker.cloneWithFencing(true)));
+ Optional<BrokerRegistrationFencingChange> fencingChange =
+ BrokerRegistrationFencingChange.fromValue(record.fenced());
+ if (!fencingChange.isPresent()) {
+ throw new IllegalStateException(String.format("Unable to replay
%s: unknown " +
+ "value for fenced field: %d", record.toString(),
record.fenced()));
+ }
+ Optional<BrokerRegistrationInControlledShutdownChange>
inControlledShutdownChange =
+
BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown());
+ if (!inControlledShutdownChange.isPresent()) {
+ throw new IllegalStateException(String.format("Unable to replay
%s: unknown " +
+ "value for inControlledShutdown field: %d", record.toString(),
record.inControlledShutdown()));
+ }
Review Comment:
I think you can simplify this handling by using `Optional#orElseThrow`.
##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1160,6 +1171,28 @@ void handleBrokerUnfenced(int brokerId, long
brokerEpoch, List<ApiMessageAndVers
brokersToIsrs.partitionsWithNoLeader());
}
+ /**
+ * Generate the appropriate records to handle a broker starting a
controlled shutdown.
+ *
+ * First, we create an BrokerRegistrationChangeRecord. Then, we remove
this broker
+ * from any non-singleton ISR and elect new leaders for partitions led by
this
+ * broker.
+ *
+ * @param brokerId The broker id.
+ * @param brokerEpoch The broker epoch.
+ * @param records The record list to append to.
+ */
+ void handleBrokerInControlledShutdown(int brokerId, long brokerEpoch,
List<ApiMessageAndVersion> records) {
+ if
(featureControl.metadataVersion().isInControlledShutdownStateSupported()) {
+ records.add(new ApiMessageAndVersion(new
BrokerRegistrationChangeRecord().
+ setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).
+
setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()),
+ (short) 1));
Review Comment:
Is it possible for the broker to send multiple heartbeats with a shutdown
request? Do we want to handle that case and avoid writing another record?
Looking at the implementation for `generateLeaderAndIsrUpdates`, it looks
like it already avoids generating multiple partition change records since the
broker was removed from the ISR.
##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -429,8 +476,10 @@ private void replayRegistrationChange(
"registration with that epoch found", record.toString()));
} else {
BrokerRegistration nextRegistration = curRegistration;
- if (fencingChange != BrokerRegistrationFencingChange.NONE) {
- nextRegistration =
nextRegistration.cloneWithFencing(fencingChange.asBoolean().get());
+ if (fencingChange != BrokerRegistrationFencingChange.NONE
+ || inControlledShutdownChange !=
BrokerRegistrationInControlledShutdownChange.NONE) {
Review Comment:
This check is not needed since `cloneWith` works for all the enum values for
these two types. If you would like to avoid an object allocation then maybe you
can move this check to `cloneWith`.
##########
metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json:
##########
@@ -17,14 +17,16 @@
"apiKey": 17,
"type": "metadata",
"name": "BrokerRegistrationChangeRecord",
- "validVersions": "0",
+ "validVersions": "0-1",
Review Comment:
Hmm. This version bump is not strictly required since we are protecting the
tagged field with the metadata version.
For example, in `PartitionChangeRecord` we added the tagged field
`LeaderRecoveryState` without changing the version.
@mumrah @cmccabe Do you have an opinion on this?
##########
metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java:
##########
@@ -152,13 +156,22 @@ public boolean fenced() {
return fenced;
}
- public ApiMessageAndVersion toRecord() {
+ public boolean inControlledShutdown() {
+ return inControlledShutdown;
+ }
+
+ public ApiMessageAndVersion toRecord(MetadataVersion metadataVersion) {
RegisterBrokerRecord registrationRecord = new RegisterBrokerRecord().
setBrokerId(id).
setRack(rack.orElse(null)).
setBrokerEpoch(epoch).
setIncarnationId(incarnationId).
setFenced(fenced);
+
+ if (metadataVersion.isInControlledShutdownStateSupported()) {
+ registrationRecord.setInControlledShutdown(inControlledShutdown);
+ }
Review Comment:
Isn't is an error if `inControlledShutdown` is true and `metadataVersion`
doesn't support this field? In other words I think we can just set this field
is `inControlledShutdown` is `true`.
I suspect that we would get an `UnsupportedVersionException` when writing
this record to the log or snapshot.
##########
metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationInControlledShutdownChange.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum BrokerRegistrationInControlledShutdownChange {
Review Comment:
What do you think about document why `Optional.of(false)` is not a valid
change?
##########
metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java:
##########
@@ -102,10 +104,34 @@ public void replay(UnfenceBrokerRecord record) {
public void replay(BrokerRegistrationChangeRecord record) {
BrokerRegistration broker =
getBrokerOrThrow(record.brokerId(), record.brokerEpoch(),
"change");
- if (record.fenced() < 0) {
- changedBrokers.put(record.brokerId(),
Optional.of(broker.cloneWithFencing(false)));
- } else if (record.fenced() > 0) {
- changedBrokers.put(record.brokerId(),
Optional.of(broker.cloneWithFencing(true)));
+ Optional<BrokerRegistrationFencingChange> fencingChange =
+ BrokerRegistrationFencingChange.fromValue(record.fenced());
+ if (!fencingChange.isPresent()) {
+ throw new IllegalStateException(String.format("Unable to replay
%s: unknown " +
+ "value for fenced field: %d", record.toString(),
record.fenced()));
+ }
+ Optional<BrokerRegistrationInControlledShutdownChange>
inControlledShutdownChange =
+
BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown());
+ if (!inControlledShutdownChange.isPresent()) {
+ throw new IllegalStateException(String.format("Unable to replay
%s: unknown " +
+ "value for inControlledShutdown field: %d", record.toString(),
record.inControlledShutdown()));
+ }
+
+ replayRegistration(record.brokerId(), broker, fencingChange.get(),
inControlledShutdownChange.get());
+ }
+
+ private void replayRegistration(
+ int brokerId,
+ BrokerRegistration broker,
+ BrokerRegistrationFencingChange fencingChange,
+ BrokerRegistrationInControlledShutdownChange inControlledShutdownChange
+ ) {
+ if (fencingChange != BrokerRegistrationFencingChange.NONE
+ || inControlledShutdownChange !=
BrokerRegistrationInControlledShutdownChange.NONE) {
Review Comment:
If you follow my other comment regarding `cloneWith`, you may be able to
replace this check with a check against the `BrokerRegistration` changing.
As we had more changeable registration field developers need to proliferate
this kind of changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]