junrao commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499753420
##########
File path: core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
##########
@@ -185,7 +185,7 @@ class BrokerEndPointTest {
"endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
"listener_security_protocol_map":{"CLIENT":"SSL",
"REPLICATION":"PLAINTEXT"},
"rack":"dc1",
- "features": {"feature1": {"min_version": 1, "max_version": 2},
"feature2": {"min_version": 2, "max_version": 4}}
+ "features": {"feature1": {"min_version": 1, "first_active_version": 1,
"max_version": 2}, "feature2": {"min_version": 2, "first_active_version": 2,
"max_version": 4}}
Review comment:
Should we revert the changes here?
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -272,6 +281,161 @@ class KafkaController(val config: KafkaConfig,
}
}
+ private def createFeatureZNode(newNode: FeatureZNode): Int = {
+ info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents:
$newNode")
+ zkClient.createFeatureZNode(newNode)
+ val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+ newVersion
+ }
+
+ private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+ info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents:
$updatedNode")
+ zkClient.updateFeatureZNode(updatedNode)
+ }
+
+ /**
+ * This method enables the feature versioning system (KIP-584).
+ *
+ * Development in Kafka (from a high level) is organized into features. Each
feature is tracked by
+ * a name and a range of version numbers. A feature can be of two types:
+ *
+ * 1. Supported feature:
+ * A supported feature is represented by a name (string) and a range of
versions (defined by a
+ * SupportedVersionRange). It refers to a feature that a particular broker
advertises support for.
+ * Each broker advertises the version ranges of its own supported features
in its own
+ * BrokerIdZNode. The contents of the advertisement are specific to the
particular broker and
+ * do not represent any guarantee of a cluster-wide availability of the
feature for any particular
+ * range of versions.
+ *
+ * 2. Finalized feature:
+ * A finalized feature is represented by a name (string) and a range of
version levels (defined
+ * by a FinalizedVersionRange). Whenever the feature versioning system
(KIP-584) is
+ * enabled, the finalized features are stored in the cluster-wide common
FeatureZNode.
+ * In comparison to a supported feature, the key difference is that a
finalized feature exists
+ * in ZK only when it is guaranteed to be supported by any random broker in
the cluster for a
+ * specified range of version levels. Also, the controller is the only
entity modifying the
+ * information about finalized features.
+ *
+ * This method sets up the FeatureZNode with enabled status, which means
that the finalized
+ * features stored in the FeatureZNode are active. The enabled status should
be written by the
+ * controller to the FeatureZNode only when the broker IBP config is greater
than or equal to
+ * KAFKA_2_7_IV0.
+ *
+ * There are multiple cases handled here:
+ *
+ * 1. New cluster bootstrap:
+ * A new Kafka cluster (i.e. it is deployed first time) is almost always
started with IBP config
+ * setting greater than or equal to KAFKA_2_7_IV0. We would like to start
the cluster with all
+ * the possible supported features finalized immediately. Assuming this
is the case, the
+ * controller will start up and notice that the FeatureZNode is absent in
the new cluster,
+ * it will then create a FeatureZNode (with enabled status) containing
the entire list of
+ * supported features as its finalized features.
+ *
+ * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+ * Imagine there was an existing Kafka cluster with IBP config less than
KAFKA_2_7_IV0, and the
+ * broker binary has now been upgraded to a newer version that supports
the feature versioning
+ * system (KIP-584). But the IBP config is still set to lower than
KAFKA_2_7_IV0, and may be
+ * set to a higher value later. In this case, we want to start with no
finalized features and
+ * allow the user to finalize them whenever they are ready i.e. in the
future whenever the
+ * user sets IBP config to be greater than or equal to KAFKA_2_7_IV0,
then the user could start
+ * finalizing the features. This process ensures we do not enable all the
possible features
+ * immediately after an upgrade, which could be harmful to Kafka.
+ * This is how we handle such a case:
+ * - Before the IBP config upgrade (i.e. IBP config set to less than
KAFKA_2_7_IV0), the
+ * controller will start up and check if the FeatureZNode is absent.
+ * - If the node is absent, it will react by creating a FeatureZNode
with disabled status
+ * and empty finalized features.
+ * - Otherwise, if a node already exists in enabled status then the
controller will just
+ * flip the status to disabled and clear the finalized features.
+ * - After the IBP config upgrade (i.e. IBP config set to greater than
or equal to
+ * KAFKA_2_7_IV0), when the controller starts up it will check if the
FeatureZNode exists
+ * and whether it is disabled.
+ * - If the node is in disabled status, the controller won’t upgrade
all features immediately.
+ * Instead it will just switch the FeatureZNode status to enabled
status. This lets the
+ * user finalize the features later.
+ * - Otherwise, if a node already exists in enabled status then the
controller will leave
+ * the node umodified.
+ *
+ * 3. Broker binary upgraded, with existing cluster IBP config >=
KAFKA_2_7_IV0:
+ * Imagine there was an existing Kafka cluster with IBP config >=
KAFKA_2_7_IV0, and the broker
+ * binary has just been upgraded to a newer version (that supports IBP
config KAFKA_2_7_IV0 and
+ * higher). The controller will start up and find that a FeatureZNode is
already present with
+ * enabled status and existing finalized features. In such a case, the
controller leaves the node
+ * unmodified.
+ *
+ * 4. Broker downgrade:
+ * Imagine that a Kafka cluster exists already and the IBP config is
greater than or equal to
+ * KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by
setting IBP config to a
+ * value less than KAFKA_2_7_IV0. This means the user is also disabling
the feature versioning
+ * system (KIP-584). In this case, when the controller starts up with the
lower IBP config, it
+ * will switch the FeatureZNode status to disabled with empty features.
+ */
+ private def enableFeatureVersioning(): Unit = {
+ val (mayBeFeatureZNodeBytes, version) =
zkClient.getDataAndVersion(FeatureZNode.path)
+ if (version == ZkVersion.UnknownVersion) {
+ val newVersion = createFeatureZNode(new
FeatureZNode(FeatureZNodeStatus.Enabled,
+
brokerFeatures.defaultFinalizedFeatures))
+ featureCache.waitUntilEpochOrThrow(newVersion,
config.zkConnectionTimeoutMs)
+ } else {
+ val existingFeatureZNode =
FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+ val newFeatures = existingFeatureZNode.status match {
+ case FeatureZNodeStatus.Enabled => existingFeatureZNode.features
+ case FeatureZNodeStatus.Disabled =>
+ if (!existingFeatureZNode.features.empty()) {
+ warn(s"FeatureZNode at path: ${FeatureZNode.path} with disabled
status" +
+ " contains non-empty features.")
+ }
+ Features.emptyFinalizedFeatures
+ }
+ val newFeatureZNode = new FeatureZNode(FeatureZNodeStatus.Enabled,
newFeatures)
Review comment:
It's a bit weird that FeatureZNode.status is defined as
FeatureZNodeStatus.Value. It seems that it should be defined as just
FeatureZNodeStatus?
##########
File path: core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
##########
@@ -0,0 +1,580 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.{Optional, Properties}
+import java.util.concurrent.ExecutionException
+
+import kafka.api.KAFKA_2_7_IV0
+import kafka.utils.TestUtils
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
+import kafka.utils.TestUtils.waitUntilTrue
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions,
FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
+import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.feature.FinalizedVersionRange
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.message.UpdateFeaturesRequestData
+import
org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{UpdateFeaturesRequest,
UpdateFeaturesResponse}
+import org.apache.kafka.common.utils.Utils
+import org.junit.Test
+import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals,
assertNotNull, assertTrue}
+import org.scalatest.Assertions.intercept
+
+import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
+import scala.util.matching.Regex
+
+class UpdateFeaturesTest extends BaseRequestTest {
+
+ override def brokerCount = 3
+
+ override def brokerPropertyOverrides(props: Properties): Unit = {
+ props.put(KafkaConfig.InterBrokerProtocolVersionProp,
KAFKA_2_7_IV0.toString)
+ }
+
+ private def defaultSupportedFeatures(): Features[SupportedVersionRange] = {
+ Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new
SupportedVersionRange(1, 3))))
+ }
+
+ private def defaultFinalizedFeatures(): Features[FinalizedVersionRange] = {
+ Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new
FinalizedVersionRange(1, 2))))
+ }
+
+ private def updateSupportedFeatures(
+ features: Features[SupportedVersionRange], targetServers:
Set[KafkaServer]): Unit = {
+ targetServers.foreach(s => {
+ s.brokerFeatures.setSupportedFeatures(features)
+ s.zkClient.updateBrokerInfo(s.createBrokerInfo)
+ })
+
+ // Wait until updates to all BrokerZNode supported features propagate to
the controller.
+ val brokerIds = targetServers.map(s => s.config.brokerId)
+ waitUntilTrue(
+ () => servers.exists(s => {
+ if (s.kafkaController.isActive) {
+ s.kafkaController.controllerContext.liveOrShuttingDownBrokers
+ .filter(b => brokerIds.contains(b.id))
+ .forall(b => {
+ b.features.equals(features)
+ })
+ } else {
+ false
+ }
+ }),
+ "Controller did not get broker updates")
+ }
+
+ private def updateSupportedFeaturesInAllBrokers(features:
Features[SupportedVersionRange]): Unit = {
+ updateSupportedFeatures(features, Set[KafkaServer]() ++ servers)
+ }
+
+ private def updateFeatureZNode(features: Features[FinalizedVersionRange]):
Int = {
+ val server = serverForId(0).get
+ val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, features)
+ val newVersion = server.zkClient.updateFeatureZNode(newNode)
+ servers.foreach(s => {
+ s.featureCache.waitUntilEpochOrThrow(newVersion,
s.config.zkConnectionTimeoutMs)
+ })
+ newVersion
+ }
+
+ private def getFeatureZNode(): FeatureZNode = {
+ val (mayBeFeatureZNodeBytes, version) =
serverForId(0).get.zkClient.getDataAndVersion(FeatureZNode.path)
+ assertNotEquals(version, ZkVersion.UnknownVersion)
+ FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+ }
+
+ private def finalizedFeatures(features: java.util.Map[String,
org.apache.kafka.clients.admin.FinalizedVersionRange]):
Features[FinalizedVersionRange] = {
+ Features.finalizedFeatures(features.asScala.map {
+ case(name, versionRange) =>
+ (name, new FinalizedVersionRange(versionRange.minVersionLevel(),
versionRange.maxVersionLevel()))
+ }.asJava)
+ }
+
+ private def supportedFeatures(features: java.util.Map[String,
org.apache.kafka.clients.admin.SupportedVersionRange]):
Features[SupportedVersionRange] = {
+ Features.supportedFeatures(features.asScala.map {
+ case(name, versionRange) =>
+ (name, new SupportedVersionRange(versionRange.minVersion(),
versionRange.maxVersion()))
+ }.asJava)
+ }
+
+ private def checkFeatures(client: Admin,
+ expectedNode: FeatureZNode,
+ expectedFinalizedFeatures:
Features[FinalizedVersionRange],
+ expectedFinalizedFeaturesEpoch: Long,
+ expectedSupportedFeatures:
Features[SupportedVersionRange]): Unit = {
+ assertEquals(expectedNode, getFeatureZNode())
+ val featureMetadata = client.describeFeatures(
+ new
DescribeFeaturesOptions().sendRequestToController(true)).featureMetadata.get
+ assertEquals(expectedFinalizedFeatures,
finalizedFeatures(featureMetadata.finalizedFeatures))
+ assertEquals(expectedSupportedFeatures,
supportedFeatures(featureMetadata.supportedFeatures))
+ assertEquals(Optional.of(expectedFinalizedFeaturesEpoch),
featureMetadata.finalizedFeaturesEpoch)
+ }
+
+ private def checkException[ExceptionType <: Throwable](result:
UpdateFeaturesResult,
+
featureExceptionMsgPatterns: Map[String, Regex])
+ (implicit tag:
ClassTag[ExceptionType]): Unit = {
+ featureExceptionMsgPatterns.foreach {
+ case (feature, exceptionMsgPattern) =>
+ val exception = intercept[ExecutionException] {
+ result.values().get(feature).get()
+ }
+ val cause = exception.getCause
+ assertNotNull(cause)
+ assertEquals(cause.getClass, tag.runtimeClass)
+ assertTrue(s"Received unexpected error message: ${cause.getMessage}",
+ exceptionMsgPattern.findFirstIn(cause.getMessage).isDefined)
+ }
+ }
+
+ /**
+ * Tests whether an invalid feature update does not get processed on the
server as expected,
+ * and raises the ExceptionType on the client side as expected.
+ *
+ * @param invalidUpdate the invalid feature update to be sent in the
+ * updateFeatures request to the server
+ * @param exceptionMsgPattern a pattern for the expected exception message
+ */
+ private def testWithInvalidFeatureUpdate[ExceptionType <:
Throwable](feature: String,
Review comment:
Could we add feature to the javadoc above?
##########
File path: core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
##########
@@ -0,0 +1,580 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.{Optional, Properties}
+import java.util.concurrent.ExecutionException
+
+import kafka.api.KAFKA_2_7_IV0
+import kafka.utils.TestUtils
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
+import kafka.utils.TestUtils.waitUntilTrue
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions,
FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
+import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.feature.FinalizedVersionRange
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.message.UpdateFeaturesRequestData
+import
org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{UpdateFeaturesRequest,
UpdateFeaturesResponse}
+import org.apache.kafka.common.utils.Utils
+import org.junit.Test
+import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals,
assertNotNull, assertTrue}
+import org.scalatest.Assertions.intercept
+
+import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
+import scala.util.matching.Regex
+
+class UpdateFeaturesTest extends BaseRequestTest {
+
+ override def brokerCount = 3
+
+ override def brokerPropertyOverrides(props: Properties): Unit = {
+ props.put(KafkaConfig.InterBrokerProtocolVersionProp,
KAFKA_2_7_IV0.toString)
+ }
+
+ private def defaultSupportedFeatures(): Features[SupportedVersionRange] = {
+ Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new
SupportedVersionRange(1, 3))))
+ }
+
+ private def defaultFinalizedFeatures(): Features[FinalizedVersionRange] = {
+ Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new
FinalizedVersionRange(1, 2))))
+ }
+
+ private def updateSupportedFeatures(
+ features: Features[SupportedVersionRange], targetServers:
Set[KafkaServer]): Unit = {
+ targetServers.foreach(s => {
+ s.brokerFeatures.setSupportedFeatures(features)
+ s.zkClient.updateBrokerInfo(s.createBrokerInfo)
+ })
+
+ // Wait until updates to all BrokerZNode supported features propagate to
the controller.
+ val brokerIds = targetServers.map(s => s.config.brokerId)
+ waitUntilTrue(
+ () => servers.exists(s => {
+ if (s.kafkaController.isActive) {
+ s.kafkaController.controllerContext.liveOrShuttingDownBrokers
+ .filter(b => brokerIds.contains(b.id))
+ .forall(b => {
+ b.features.equals(features)
+ })
+ } else {
+ false
+ }
+ }),
+ "Controller did not get broker updates")
+ }
+
+ private def updateSupportedFeaturesInAllBrokers(features:
Features[SupportedVersionRange]): Unit = {
+ updateSupportedFeatures(features, Set[KafkaServer]() ++ servers)
+ }
+
+ private def updateFeatureZNode(features: Features[FinalizedVersionRange]):
Int = {
+ val server = serverForId(0).get
+ val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, features)
+ val newVersion = server.zkClient.updateFeatureZNode(newNode)
+ servers.foreach(s => {
+ s.featureCache.waitUntilEpochOrThrow(newVersion,
s.config.zkConnectionTimeoutMs)
+ })
+ newVersion
+ }
+
+ private def getFeatureZNode(): FeatureZNode = {
+ val (mayBeFeatureZNodeBytes, version) =
serverForId(0).get.zkClient.getDataAndVersion(FeatureZNode.path)
+ assertNotEquals(version, ZkVersion.UnknownVersion)
+ FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+ }
+
+ private def finalizedFeatures(features: java.util.Map[String,
org.apache.kafka.clients.admin.FinalizedVersionRange]):
Features[FinalizedVersionRange] = {
+ Features.finalizedFeatures(features.asScala.map {
+ case(name, versionRange) =>
+ (name, new FinalizedVersionRange(versionRange.minVersionLevel(),
versionRange.maxVersionLevel()))
+ }.asJava)
+ }
+
+ private def supportedFeatures(features: java.util.Map[String,
org.apache.kafka.clients.admin.SupportedVersionRange]):
Features[SupportedVersionRange] = {
+ Features.supportedFeatures(features.asScala.map {
+ case(name, versionRange) =>
+ (name, new SupportedVersionRange(versionRange.minVersion(),
versionRange.maxVersion()))
+ }.asJava)
+ }
+
+ private def checkFeatures(client: Admin,
+ expectedNode: FeatureZNode,
+ expectedFinalizedFeatures:
Features[FinalizedVersionRange],
+ expectedFinalizedFeaturesEpoch: Long,
+ expectedSupportedFeatures:
Features[SupportedVersionRange]): Unit = {
+ assertEquals(expectedNode, getFeatureZNode())
+ val featureMetadata = client.describeFeatures(
+ new
DescribeFeaturesOptions().sendRequestToController(true)).featureMetadata.get
+ assertEquals(expectedFinalizedFeatures,
finalizedFeatures(featureMetadata.finalizedFeatures))
+ assertEquals(expectedSupportedFeatures,
supportedFeatures(featureMetadata.supportedFeatures))
+ assertEquals(Optional.of(expectedFinalizedFeaturesEpoch),
featureMetadata.finalizedFeaturesEpoch)
+ }
+
+ private def checkException[ExceptionType <: Throwable](result:
UpdateFeaturesResult,
+
featureExceptionMsgPatterns: Map[String, Regex])
+ (implicit tag:
ClassTag[ExceptionType]): Unit = {
+ featureExceptionMsgPatterns.foreach {
+ case (feature, exceptionMsgPattern) =>
+ val exception = intercept[ExecutionException] {
+ result.values().get(feature).get()
+ }
+ val cause = exception.getCause
+ assertNotNull(cause)
+ assertEquals(cause.getClass, tag.runtimeClass)
+ assertTrue(s"Received unexpected error message: ${cause.getMessage}",
+ exceptionMsgPattern.findFirstIn(cause.getMessage).isDefined)
+ }
+ }
+
+ /**
+ * Tests whether an invalid feature update does not get processed on the
server as expected,
+ * and raises the ExceptionType on the client side as expected.
+ *
+ * @param invalidUpdate the invalid feature update to be sent in the
+ * updateFeatures request to the server
+ * @param exceptionMsgPattern a pattern for the expected exception message
+ */
+ private def testWithInvalidFeatureUpdate[ExceptionType <:
Throwable](feature: String,
+
invalidUpdate: FeatureUpdate,
+
exceptionMsgPattern: Regex)
+
(implicit tag: ClassTag[ExceptionType]): Unit = {
+ TestUtils.waitUntilControllerElected(zkClient)
+
+ updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures())
+ val versionBefore = updateFeatureZNode(defaultFinalizedFeatures())
+ val adminClient = createAdminClient()
+ val nodeBefore = getFeatureZNode()
+
+ val result = adminClient.updateFeatures(Utils.mkMap(Utils.mkEntry(feature,
invalidUpdate)), new UpdateFeaturesOptions())
+
+ checkException[ExceptionType](result, Map(feature -> exceptionMsgPattern))
+ checkFeatures(
+ adminClient,
+ nodeBefore,
+ defaultFinalizedFeatures(),
+ versionBefore,
+ defaultSupportedFeatures())
+ }
+
+ /**
+ * Tests that an UpdateFeatures request sent to a non-Controller node fails
as expected.
+ */
+ @Test
+ def testShouldFailRequestIfNotController(): Unit = {
+ TestUtils.waitUntilControllerElected(zkClient)
+
+ updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures())
+ val versionBefore = updateFeatureZNode(defaultFinalizedFeatures())
+
+ val nodeBefore = getFeatureZNode()
+ val validUpdates = new FeatureUpdateKeyCollection()
+ val validUpdate = new UpdateFeaturesRequestData.FeatureUpdateKey();
+ validUpdate.setFeature("feature_1");
+
validUpdate.setMaxVersionLevel(defaultSupportedFeatures().get("feature_1").max())
+ validUpdate.setAllowDowngrade(false)
+ validUpdates.add(validUpdate)
+
+ val response = connectAndReceive[UpdateFeaturesResponse](
+ new UpdateFeaturesRequest.Builder(new
UpdateFeaturesRequestData().setFeatureUpdates(validUpdates)).build(),
+ notControllerSocketServer)
+
+ assertEquals(Errors.NOT_CONTROLLER,
Errors.forCode(response.data.errorCode()))
+ assertNotNull(response.data.errorMessage())
+ assertEquals(0, response.data.results.size)
+ checkFeatures(
+ createAdminClient(),
+ nodeBefore,
+ defaultFinalizedFeatures(),
+ versionBefore,
+ defaultSupportedFeatures())
+ }
+
+ /**
+ * Tests that an UpdateFeatures request fails in the Controller, when, for a
feature the
+ * allowDowngrade flag is not set during a downgrade request.
+ */
+ @Test
+ def testShouldFailRequestWhenDowngradeFlagIsNotSetDuringDowngrade(): Unit = {
+ val targetMaxVersionLevel =
(defaultFinalizedFeatures().get("feature_1").max() - 1).asInstanceOf[Short]
+ testWithInvalidFeatureUpdate[InvalidRequestException](
+ "feature_1",
+ new FeatureUpdate(targetMaxVersionLevel,false),
+ ".*Can not downgrade finalized feature.*allowDowngrade.*".r)
+ }
+
+ /**
+ * Tests that an UpdateFeatures request fails in the Controller, when, for a
feature the downgrade
+ * is attempted to a max version level thats higher than the existing max
version level.
+ */
+ @Test
+ def testShouldFailRequestWhenDowngradeToHigherVersionLevelIsAttempted():
Unit = {
+ val targetMaxVersionLevel =
(defaultFinalizedFeatures().get("feature_1").max() + 1).asInstanceOf[Short]
+ testWithInvalidFeatureUpdate[InvalidRequestException](
+ "feature_1",
+ new FeatureUpdate(targetMaxVersionLevel, true),
+ ".*When the allowDowngrade flag set in the request, the provided
maxVersionLevel:3.*existing maxVersionLevel:2.*".r)
+ }
+
+ /**
+ * Tests that an UpdateFeatures request fails in the Controller, when, a
feature deletion is
+ * attempted without setting the allowDowngrade flag.
+ */
+ @Test
+ def testShouldFailRequestInServerWhenDowngradeFlagIsNotSetDuringDeletion():
Unit = {
+ TestUtils.waitUntilControllerElected(zkClient)
+
+ updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures())
+ val versionBefore = updateFeatureZNode(defaultFinalizedFeatures())
+
+ val adminClient = createAdminClient()
+ val nodeBefore = getFeatureZNode()
+
+ val invalidUpdates
+ = new UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
+ val invalidUpdate = new UpdateFeaturesRequestData.FeatureUpdateKey();
+ invalidUpdate.setFeature("feature_1")
+ invalidUpdate.setMaxVersionLevel(0)
+ invalidUpdate.setAllowDowngrade(false)
+ invalidUpdates.add(invalidUpdate);
+ val requestData = new UpdateFeaturesRequestData()
+ requestData.setFeatureUpdates(invalidUpdates);
+
+ val response = connectAndReceive[UpdateFeaturesResponse](
+ new UpdateFeaturesRequest.Builder(new
UpdateFeaturesRequestData().setFeatureUpdates(invalidUpdates)).build(),
+ controllerSocketServer)
+
+ assertEquals(1, response.data().results().size())
+ val result = response.data.results.asScala.head
+ assertEquals("feature_1", result.feature)
+ assertEquals(Errors.INVALID_REQUEST, Errors.forCode(result.errorCode))
+ assertNotNull(result.errorMessage)
+ assertFalse(result.errorMessage.isEmpty)
+ val exceptionMsgPattern = ".*Can not provide maxVersionLevel: 0 less than
1.*allowDowngrade.*".r
+ assertTrue(result.errorMessage,
exceptionMsgPattern.findFirstIn(result.errorMessage).isDefined)
+ checkFeatures(
+ adminClient,
+ nodeBefore,
+ defaultFinalizedFeatures(),
+ versionBefore,
+ defaultSupportedFeatures())
+ }
+
+ /**
+ * Tests that an UpdateFeatures request fails in the Controller, when, a
feature version level
+ * upgrade is attempted for a non-existing feature.
+ */
+ @Test
+ def testShouldFailRequestDuringDeletionOfNonExistingFeature(): Unit = {
+ testWithInvalidFeatureUpdate[InvalidRequestException](
+ "feature_non_existing",
+ new FeatureUpdate(0, true),
+ ".*Can not delete non-existing finalized feature.*".r)
+ }
+
+ /**
+ * Tests that an UpdateFeatures request fails in the Controller, when, a
feature version level
+ * upgrade is attempted to a version level thats the same as the existing
max version level.
Review comment:
typo thats
##########
File path:
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##########
@@ -715,7 +747,58 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
doAnswer((_: InvocationOnMock) => {
latch.countDown()
}).doCallRealMethod().when(spyThread).awaitShutdown()
- controller.shutdown()
+ controller.shutdown()
+ }
+
+ private def testControllerFeatureZNodeSetup(initialZNode:
Option[FeatureZNode],
+ interBrokerProtocolVersion:
ApiVersion): Unit = {
+ val versionBeforeOpt = initialZNode match {
+ case Some(node) =>
+ zkClient.createFeatureZNode(node)
+ Some(zkClient.getDataAndVersion(FeatureZNode.path)._2)
+ case None =>
+ Option.empty
+ }
+ servers = makeServers(1, interBrokerProtocolVersion =
Some(interBrokerProtocolVersion))
+ TestUtils.waitUntilControllerElected(zkClient)
Review comment:
This is probably not enough since it only waits for the controller path
to be created in ZK, which happens before the processing of the finalized
features.
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -272,6 +281,161 @@ class KafkaController(val config: KafkaConfig,
}
}
+ private def createFeatureZNode(newNode: FeatureZNode): Int = {
+ info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents:
$newNode")
+ zkClient.createFeatureZNode(newNode)
+ val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+ newVersion
+ }
+
+ private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+ info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents:
$updatedNode")
+ zkClient.updateFeatureZNode(updatedNode)
+ }
+
+ /**
+ * This method enables the feature versioning system (KIP-584).
+ *
+ * Development in Kafka (from a high level) is organized into features. Each
feature is tracked by
+ * a name and a range of version numbers. A feature can be of two types:
+ *
+ * 1. Supported feature:
+ * A supported feature is represented by a name (string) and a range of
versions (defined by a
+ * SupportedVersionRange). It refers to a feature that a particular broker
advertises support for.
+ * Each broker advertises the version ranges of its own supported features
in its own
+ * BrokerIdZNode. The contents of the advertisement are specific to the
particular broker and
+ * do not represent any guarantee of a cluster-wide availability of the
feature for any particular
+ * range of versions.
+ *
+ * 2. Finalized feature:
+ * A finalized feature is represented by a name (string) and a range of
version levels (defined
+ * by a FinalizedVersionRange). Whenever the feature versioning system
(KIP-584) is
+ * enabled, the finalized features are stored in the cluster-wide common
FeatureZNode.
+ * In comparison to a supported feature, the key difference is that a
finalized feature exists
+ * in ZK only when it is guaranteed to be supported by any random broker in
the cluster for a
+ * specified range of version levels. Also, the controller is the only
entity modifying the
+ * information about finalized features.
+ *
+ * This method sets up the FeatureZNode with enabled status, which means
that the finalized
+ * features stored in the FeatureZNode are active. The enabled status should
be written by the
+ * controller to the FeatureZNode only when the broker IBP config is greater
than or equal to
+ * KAFKA_2_7_IV0.
+ *
+ * There are multiple cases handled here:
+ *
+ * 1. New cluster bootstrap:
+ * A new Kafka cluster (i.e. it is deployed first time) is almost always
started with IBP config
+ * setting greater than or equal to KAFKA_2_7_IV0. We would like to start
the cluster with all
+ * the possible supported features finalized immediately. Assuming this
is the case, the
+ * controller will start up and notice that the FeatureZNode is absent in
the new cluster,
+ * it will then create a FeatureZNode (with enabled status) containing
the entire list of
+ * supported features as its finalized features.
+ *
+ * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+ * Imagine there was an existing Kafka cluster with IBP config less than
KAFKA_2_7_IV0, and the
+ * broker binary has now been upgraded to a newer version that supports
the feature versioning
+ * system (KIP-584). But the IBP config is still set to lower than
KAFKA_2_7_IV0, and may be
+ * set to a higher value later. In this case, we want to start with no
finalized features and
+ * allow the user to finalize them whenever they are ready i.e. in the
future whenever the
+ * user sets IBP config to be greater than or equal to KAFKA_2_7_IV0,
then the user could start
+ * finalizing the features. This process ensures we do not enable all the
possible features
+ * immediately after an upgrade, which could be harmful to Kafka.
+ * This is how we handle such a case:
+ * - Before the IBP config upgrade (i.e. IBP config set to less than
KAFKA_2_7_IV0), the
+ * controller will start up and check if the FeatureZNode is absent.
+ * - If the node is absent, it will react by creating a FeatureZNode
with disabled status
+ * and empty finalized features.
+ * - Otherwise, if a node already exists in enabled status then the
controller will just
+ * flip the status to disabled and clear the finalized features.
+ * - After the IBP config upgrade (i.e. IBP config set to greater than
or equal to
+ * KAFKA_2_7_IV0), when the controller starts up it will check if the
FeatureZNode exists
+ * and whether it is disabled.
+ * - If the node is in disabled status, the controller won’t upgrade
all features immediately.
+ * Instead it will just switch the FeatureZNode status to enabled
status. This lets the
+ * user finalize the features later.
+ * - Otherwise, if a node already exists in enabled status then the
controller will leave
+ * the node umodified.
+ *
+ * 3. Broker binary upgraded, with existing cluster IBP config >=
KAFKA_2_7_IV0:
+ * Imagine there was an existing Kafka cluster with IBP config >=
KAFKA_2_7_IV0, and the broker
+ * binary has just been upgraded to a newer version (that supports IBP
config KAFKA_2_7_IV0 and
+ * higher). The controller will start up and find that a FeatureZNode is
already present with
+ * enabled status and existing finalized features. In such a case, the
controller leaves the node
+ * unmodified.
+ *
+ * 4. Broker downgrade:
+ * Imagine that a Kafka cluster exists already and the IBP config is
greater than or equal to
+ * KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by
setting IBP config to a
+ * value less than KAFKA_2_7_IV0. This means the user is also disabling
the feature versioning
+ * system (KIP-584). In this case, when the controller starts up with the
lower IBP config, it
+ * will switch the FeatureZNode status to disabled with empty features.
+ */
+ private def enableFeatureVersioning(): Unit = {
+ val (mayBeFeatureZNodeBytes, version) =
zkClient.getDataAndVersion(FeatureZNode.path)
+ if (version == ZkVersion.UnknownVersion) {
+ val newVersion = createFeatureZNode(new
FeatureZNode(FeatureZNodeStatus.Enabled,
+
brokerFeatures.defaultFinalizedFeatures))
+ featureCache.waitUntilEpochOrThrow(newVersion,
config.zkConnectionTimeoutMs)
+ } else {
+ val existingFeatureZNode =
FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+ val newFeatures = existingFeatureZNode.status match {
+ case FeatureZNodeStatus.Enabled => existingFeatureZNode.features
+ case FeatureZNodeStatus.Disabled =>
+ if (!existingFeatureZNode.features.empty()) {
+ warn(s"FeatureZNode at path: ${FeatureZNode.path} with disabled
status" +
+ " contains non-empty features.")
Review comment:
Should we log the non-empty features too?
##########
File path: core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
##########
@@ -0,0 +1,580 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.{Optional, Properties}
+import java.util.concurrent.ExecutionException
+
+import kafka.api.KAFKA_2_7_IV0
+import kafka.utils.TestUtils
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
+import kafka.utils.TestUtils.waitUntilTrue
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions,
FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
+import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.feature.FinalizedVersionRange
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.message.UpdateFeaturesRequestData
+import
org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{UpdateFeaturesRequest,
UpdateFeaturesResponse}
+import org.apache.kafka.common.utils.Utils
+import org.junit.Test
+import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals,
assertNotNull, assertTrue}
+import org.scalatest.Assertions.intercept
+
+import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
+import scala.util.matching.Regex
+
+class UpdateFeaturesTest extends BaseRequestTest {
+
+ override def brokerCount = 3
+
+ override def brokerPropertyOverrides(props: Properties): Unit = {
+ props.put(KafkaConfig.InterBrokerProtocolVersionProp,
KAFKA_2_7_IV0.toString)
+ }
+
+ private def defaultSupportedFeatures(): Features[SupportedVersionRange] = {
+ Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new
SupportedVersionRange(1, 3))))
+ }
+
+ private def defaultFinalizedFeatures(): Features[FinalizedVersionRange] = {
+ Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new
FinalizedVersionRange(1, 2))))
+ }
+
+ private def updateSupportedFeatures(
+ features: Features[SupportedVersionRange], targetServers:
Set[KafkaServer]): Unit = {
+ targetServers.foreach(s => {
+ s.brokerFeatures.setSupportedFeatures(features)
+ s.zkClient.updateBrokerInfo(s.createBrokerInfo)
+ })
+
+ // Wait until updates to all BrokerZNode supported features propagate to
the controller.
+ val brokerIds = targetServers.map(s => s.config.brokerId)
+ waitUntilTrue(
+ () => servers.exists(s => {
+ if (s.kafkaController.isActive) {
+ s.kafkaController.controllerContext.liveOrShuttingDownBrokers
+ .filter(b => brokerIds.contains(b.id))
+ .forall(b => {
+ b.features.equals(features)
+ })
+ } else {
+ false
+ }
+ }),
+ "Controller did not get broker updates")
+ }
+
+ private def updateSupportedFeaturesInAllBrokers(features:
Features[SupportedVersionRange]): Unit = {
+ updateSupportedFeatures(features, Set[KafkaServer]() ++ servers)
+ }
+
+ private def updateFeatureZNode(features: Features[FinalizedVersionRange]):
Int = {
+ val server = serverForId(0).get
+ val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, features)
+ val newVersion = server.zkClient.updateFeatureZNode(newNode)
+ servers.foreach(s => {
+ s.featureCache.waitUntilEpochOrThrow(newVersion,
s.config.zkConnectionTimeoutMs)
+ })
+ newVersion
+ }
+
+ private def getFeatureZNode(): FeatureZNode = {
+ val (mayBeFeatureZNodeBytes, version) =
serverForId(0).get.zkClient.getDataAndVersion(FeatureZNode.path)
+ assertNotEquals(version, ZkVersion.UnknownVersion)
+ FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+ }
+
+ private def finalizedFeatures(features: java.util.Map[String,
org.apache.kafka.clients.admin.FinalizedVersionRange]):
Features[FinalizedVersionRange] = {
+ Features.finalizedFeatures(features.asScala.map {
+ case(name, versionRange) =>
+ (name, new FinalizedVersionRange(versionRange.minVersionLevel(),
versionRange.maxVersionLevel()))
+ }.asJava)
+ }
+
+ private def supportedFeatures(features: java.util.Map[String,
org.apache.kafka.clients.admin.SupportedVersionRange]):
Features[SupportedVersionRange] = {
+ Features.supportedFeatures(features.asScala.map {
+ case(name, versionRange) =>
+ (name, new SupportedVersionRange(versionRange.minVersion(),
versionRange.maxVersion()))
+ }.asJava)
+ }
+
+ private def checkFeatures(client: Admin,
+ expectedNode: FeatureZNode,
+ expectedFinalizedFeatures:
Features[FinalizedVersionRange],
+ expectedFinalizedFeaturesEpoch: Long,
+ expectedSupportedFeatures:
Features[SupportedVersionRange]): Unit = {
+ assertEquals(expectedNode, getFeatureZNode())
+ val featureMetadata = client.describeFeatures(
+ new
DescribeFeaturesOptions().sendRequestToController(true)).featureMetadata.get
+ assertEquals(expectedFinalizedFeatures,
finalizedFeatures(featureMetadata.finalizedFeatures))
+ assertEquals(expectedSupportedFeatures,
supportedFeatures(featureMetadata.supportedFeatures))
+ assertEquals(Optional.of(expectedFinalizedFeaturesEpoch),
featureMetadata.finalizedFeaturesEpoch)
+ }
+
+ private def checkException[ExceptionType <: Throwable](result:
UpdateFeaturesResult,
+
featureExceptionMsgPatterns: Map[String, Regex])
+ (implicit tag:
ClassTag[ExceptionType]): Unit = {
+ featureExceptionMsgPatterns.foreach {
+ case (feature, exceptionMsgPattern) =>
+ val exception = intercept[ExecutionException] {
+ result.values().get(feature).get()
+ }
+ val cause = exception.getCause
+ assertNotNull(cause)
+ assertEquals(cause.getClass, tag.runtimeClass)
+ assertTrue(s"Received unexpected error message: ${cause.getMessage}",
+ exceptionMsgPattern.findFirstIn(cause.getMessage).isDefined)
+ }
+ }
+
+ /**
+ * Tests whether an invalid feature update does not get processed on the
server as expected,
+ * and raises the ExceptionType on the client side as expected.
+ *
+ * @param invalidUpdate the invalid feature update to be sent in the
+ * updateFeatures request to the server
+ * @param exceptionMsgPattern a pattern for the expected exception message
+ */
+ private def testWithInvalidFeatureUpdate[ExceptionType <:
Throwable](feature: String,
+
invalidUpdate: FeatureUpdate,
+
exceptionMsgPattern: Regex)
+
(implicit tag: ClassTag[ExceptionType]): Unit = {
+ TestUtils.waitUntilControllerElected(zkClient)
+
+ updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures())
+ val versionBefore = updateFeatureZNode(defaultFinalizedFeatures())
+ val adminClient = createAdminClient()
+ val nodeBefore = getFeatureZNode()
+
+ val result = adminClient.updateFeatures(Utils.mkMap(Utils.mkEntry(feature,
invalidUpdate)), new UpdateFeaturesOptions())
+
+ checkException[ExceptionType](result, Map(feature -> exceptionMsgPattern))
+ checkFeatures(
+ adminClient,
+ nodeBefore,
+ defaultFinalizedFeatures(),
+ versionBefore,
+ defaultSupportedFeatures())
+ }
+
+ /**
+ * Tests that an UpdateFeatures request sent to a non-Controller node fails
as expected.
+ */
+ @Test
+ def testShouldFailRequestIfNotController(): Unit = {
+ TestUtils.waitUntilControllerElected(zkClient)
+
+ updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures())
+ val versionBefore = updateFeatureZNode(defaultFinalizedFeatures())
+
+ val nodeBefore = getFeatureZNode()
+ val validUpdates = new FeatureUpdateKeyCollection()
+ val validUpdate = new UpdateFeaturesRequestData.FeatureUpdateKey();
+ validUpdate.setFeature("feature_1");
+
validUpdate.setMaxVersionLevel(defaultSupportedFeatures().get("feature_1").max())
+ validUpdate.setAllowDowngrade(false)
+ validUpdates.add(validUpdate)
+
+ val response = connectAndReceive[UpdateFeaturesResponse](
+ new UpdateFeaturesRequest.Builder(new
UpdateFeaturesRequestData().setFeatureUpdates(validUpdates)).build(),
+ notControllerSocketServer)
+
+ assertEquals(Errors.NOT_CONTROLLER,
Errors.forCode(response.data.errorCode()))
+ assertNotNull(response.data.errorMessage())
+ assertEquals(0, response.data.results.size)
+ checkFeatures(
+ createAdminClient(),
+ nodeBefore,
+ defaultFinalizedFeatures(),
+ versionBefore,
+ defaultSupportedFeatures())
+ }
+
+ /**
+ * Tests that an UpdateFeatures request fails in the Controller, when, for a
feature the
+ * allowDowngrade flag is not set during a downgrade request.
+ */
+ @Test
+ def testShouldFailRequestWhenDowngradeFlagIsNotSetDuringDowngrade(): Unit = {
+ val targetMaxVersionLevel =
(defaultFinalizedFeatures().get("feature_1").max() - 1).asInstanceOf[Short]
+ testWithInvalidFeatureUpdate[InvalidRequestException](
+ "feature_1",
+ new FeatureUpdate(targetMaxVersionLevel,false),
+ ".*Can not downgrade finalized feature.*allowDowngrade.*".r)
+ }
+
+ /**
+ * Tests that an UpdateFeatures request fails in the Controller, when, for a
feature the downgrade
+ * is attempted to a max version level thats higher than the existing max
version level.
+ */
+ @Test
+ def testShouldFailRequestWhenDowngradeToHigherVersionLevelIsAttempted():
Unit = {
+ val targetMaxVersionLevel =
(defaultFinalizedFeatures().get("feature_1").max() + 1).asInstanceOf[Short]
+ testWithInvalidFeatureUpdate[InvalidRequestException](
+ "feature_1",
+ new FeatureUpdate(targetMaxVersionLevel, true),
+ ".*When the allowDowngrade flag set in the request, the provided
maxVersionLevel:3.*existing maxVersionLevel:2.*".r)
+ }
+
+ /**
+ * Tests that an UpdateFeatures request fails in the Controller, when, a
feature deletion is
+ * attempted without setting the allowDowngrade flag.
+ */
+ @Test
+ def testShouldFailRequestInServerWhenDowngradeFlagIsNotSetDuringDeletion():
Unit = {
+ TestUtils.waitUntilControllerElected(zkClient)
+
+ updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures())
+ val versionBefore = updateFeatureZNode(defaultFinalizedFeatures())
+
+ val adminClient = createAdminClient()
+ val nodeBefore = getFeatureZNode()
+
+ val invalidUpdates
+ = new UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
+ val invalidUpdate = new UpdateFeaturesRequestData.FeatureUpdateKey();
+ invalidUpdate.setFeature("feature_1")
+ invalidUpdate.setMaxVersionLevel(0)
+ invalidUpdate.setAllowDowngrade(false)
+ invalidUpdates.add(invalidUpdate);
+ val requestData = new UpdateFeaturesRequestData()
+ requestData.setFeatureUpdates(invalidUpdates);
+
+ val response = connectAndReceive[UpdateFeaturesResponse](
+ new UpdateFeaturesRequest.Builder(new
UpdateFeaturesRequestData().setFeatureUpdates(invalidUpdates)).build(),
+ controllerSocketServer)
+
+ assertEquals(1, response.data().results().size())
+ val result = response.data.results.asScala.head
+ assertEquals("feature_1", result.feature)
+ assertEquals(Errors.INVALID_REQUEST, Errors.forCode(result.errorCode))
+ assertNotNull(result.errorMessage)
+ assertFalse(result.errorMessage.isEmpty)
+ val exceptionMsgPattern = ".*Can not provide maxVersionLevel: 0 less than
1.*allowDowngrade.*".r
+ assertTrue(result.errorMessage,
exceptionMsgPattern.findFirstIn(result.errorMessage).isDefined)
+ checkFeatures(
+ adminClient,
+ nodeBefore,
+ defaultFinalizedFeatures(),
+ versionBefore,
+ defaultSupportedFeatures())
+ }
+
+ /**
+ * Tests that an UpdateFeatures request fails in the Controller, when, a
feature version level
+ * upgrade is attempted for a non-existing feature.
+ */
+ @Test
+ def testShouldFailRequestDuringDeletionOfNonExistingFeature(): Unit = {
+ testWithInvalidFeatureUpdate[InvalidRequestException](
+ "feature_non_existing",
+ new FeatureUpdate(0, true),
Review comment:
Should we use a version > 0?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]