abbccdda commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r462453975
##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1215,71 @@ default AlterClientQuotasResult
alterClientQuotas(Collection<ClientQuotaAlterati
*/
AlterClientQuotasResult
alterClientQuotas(Collection<ClientQuotaAlteration> entries,
AlterClientQuotasOptions options);
+ /**
+ * Describes finalized as well as supported features. By default, the
request is issued to any
+ * broker. It can be optionally directed only to the controller via
DescribeFeaturesOptions
+ * parameter. This is particularly useful if the user requires strongly
consistent reads of
+ * finalized features.
+ * <p>
+ * The following exceptions can be anticipated when calling {@code get()}
on the future from the
+ * returned {@link DescribeFeaturesResult}:
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * If the request timed out before the describe operation could
finish.</li>
+ * </ul>
+ * <p>
+ * @param options the options to use
+ *
+ * @return the {@link DescribeFeaturesResult} containing the
result
+ */
+ DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
Review comment:
Note in the post-KIP-500 world, this feature could still work, but the
request must be redirected to the controller inherently on the broker side,
instead of sending it directly. So in the comment, we may try to phrase it to
convey the principal is that `the request must be handled by the controller`
instead of `the admin client must send this request to the controller`.
##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1215,70 @@ default AlterClientQuotasResult
alterClientQuotas(Collection<ClientQuotaAlterati
*/
AlterClientQuotasResult
alterClientQuotas(Collection<ClientQuotaAlteration> entries,
AlterClientQuotasOptions options);
+ /**
+ * Describes finalized as well as supported features. By default, the
request is issued to any
+ * broker. It can be optionally directed only to the controller via
DescribeFeaturesOptions
+ * parameter. This is particularly useful if the user requires strongly
consistent reads of
+ * finalized features.
+ * <p>
+ * The following exceptions can be anticipated when calling {@code get()}
on the future from the
+ * returned {@link DescribeFeaturesResult}:
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * If the request timed out before the describe operation could
finish.</li>
+ * </ul>
+ * <p>
+ * @param options the options to use
+ *
+ * @return the {@link DescribeFeaturesResult} containing the
result
+ */
+ DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+ /**
+ * Applies specified updates to finalized features. This operation is not
transactional so it
+ * may succeed for some features while fail for others.
+ * <p>
+ * The API takes in a map of finalized feature name to {@link
FeatureUpdate} that need to be
Review comment:
nit: s/name/names
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -983,8 +1144,25 @@ class KafkaController(val config: KafkaConfig,
*/
private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int],
partitions: Set[TopicPartition]): Unit = {
try {
+ val filteredBrokers = scala.collection.mutable.Set[Int]() ++ brokers
+ if (config.isFeatureVersioningEnabled) {
+ def hasIncompatibleFeatures(broker: Broker): Boolean = {
+ val latestFinalizedFeatures = featureCache.get
+ if (latestFinalizedFeatures.isDefined) {
+ BrokerFeatures.hasIncompatibleFeatures(broker.features,
latestFinalizedFeatures.get.features)
+ } else {
+ false
+ }
+ }
+ controllerContext.liveOrShuttingDownBrokers.foreach(broker => {
+ if (filteredBrokers.contains(broker.id) &&
hasIncompatibleFeatures(broker)) {
Review comment:
I see, what would happen to a currently live broker if it couldn't get
any metadata update for a while, will it shut down itself?
##########
File path:
clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)}
+ *
+ * The API of this class is evolving. See {@link Admin} for details.
+ */
[email protected]
+public class DescribeFeaturesOptions extends
AbstractOptions<DescribeFeaturesOptions> {
+
+ /**
+ * - True means the {@link
Admin#describeFeatures(DescribeFeaturesOptions)} request can be
Review comment:
`can be issued only to the controller.`/ `must be processed by the
controller`
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -266,6 +275,179 @@ class KafkaController(val config: KafkaConfig,
}
}
+ private def createFeatureZNode(newNode: FeatureZNode): Int = {
+ info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents:
$newNode")
+ zkClient.createFeatureZNode(newNode)
+ val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
Review comment:
Yea, I mean you could use `val newVersion =
zkClient.getDataAndVersion(FeatureZNode.path)._2`, but it's up to you.
##########
File path:
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4052,6 +4058,128 @@ void handleFailure(Throwable throwable) {
return new
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
}
+ @Override
+ public DescribeFeaturesResult describeFeatures(final
DescribeFeaturesOptions options) {
+ final KafkaFutureImpl<FeatureMetadata> future = new
KafkaFutureImpl<>();
+ final long now = time.milliseconds();
+ final NodeProvider provider =
+ options.sendRequestToController() ? new ControllerNodeProvider() :
new LeastLoadedNodeProvider();
+
+ Call call = new Call(
+ "describeFeatures", calcDeadlineMs(now, options.timeoutMs()),
provider) {
+
+ @Override
+ ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+ return new ApiVersionsRequest.Builder();
+ }
+
+ @Override
+ void handleResponse(AbstractResponse response) {
+ final ApiVersionsResponse apiVersionsResponse =
(ApiVersionsResponse) response;
+ if (apiVersionsResponse.data.errorCode() ==
Errors.NONE.code()) {
+ future.complete(
+ new FeatureMetadata(
+ apiVersionsResponse.finalizedFeatures(),
+ apiVersionsResponse.finalizedFeaturesEpoch(),
+ apiVersionsResponse.supportedFeatures()));
+ } else if (options.sendRequestToController() &&
apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) {
+ handleNotControllerError(Errors.NOT_CONTROLLER);
+ } else {
+ future.completeExceptionally(
+
Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+ }
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ completeAllExceptionally(Collections.singletonList(future),
throwable);
+ }
+ };
+
+ runnable.call(call, now);
+ return new DescribeFeaturesResult(future);
+ }
+
+ @Override
+ public UpdateFeaturesResult updateFeatures(
+ final Map<String, FeatureUpdate> featureUpdates, final
UpdateFeaturesOptions options) {
+ if (featureUpdates == null || featureUpdates.isEmpty()) {
+ throw new IllegalArgumentException("Feature updates can not be
null or empty.");
+ }
+ Objects.requireNonNull(options, "UpdateFeaturesOptions can not be
null");
+
+ final Map<String, KafkaFutureImpl<Void>> updateFutures = new
HashMap<>();
+ final UpdateFeaturesRequestData.FeatureUpdateKeyCollection
featureUpdatesRequestData
Review comment:
I suggest we build a static method in the `UpdateFeaturesRequest` class
to avoid exposing the sub modules of feature data, such like:
```
public static UpdateFeaturesRequestData getFeatureRequest(final Map<String,
FeatureUpdate> featureUpdate);
```
##########
File path:
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -466,6 +477,42 @@ private static DescribeGroupsResponseData
prepareDescribeGroupsResponseData(Stri
Collections.emptySet()));
return data;
}
+
+ private static UpdateFeaturesResponse
prepareUpdateFeaturesResponse(Map<String, Errors> featureUpdateErrors) {
Review comment:
Could be moved to the `UpdateFeaturesResponse`
##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -82,18 +108,54 @@ object FinalizedFeatureCache extends Logging {
" The existing cache contents are %s").format(latest,
oldFeatureAndEpoch)
throw new FeatureCacheUpdateException(errorMsg)
} else {
- val incompatibleFeatures =
SupportedFeatures.incompatibleFeatures(latest.features)
+ val incompatibleFeatures =
brokerFeatures.incompatibleFeatures(latest.features)
if (!incompatibleFeatures.empty) {
val errorMsg = ("FinalizedFeatureCache update failed since feature
compatibility" +
" checks failed! Supported %s has incompatibilities with the latest
%s."
- ).format(SupportedFeatures.get, latest)
+ ).format(brokerFeatures.supportedFeatures, latest)
throw new FeatureCacheUpdateException(errorMsg)
} else {
- val logMsg = "Updated cache from existing finalized %s to latest
finalized %s".format(
+ val logMsg = "Updated cache from existing %s to latest %s".format(
oldFeatureAndEpoch, latest)
- featuresAndEpoch = Some(latest)
+ synchronized {
+ featuresAndEpoch = Some(latest)
+ notifyAll()
+ }
info(logMsg)
}
}
}
+
+ /**
+ * Causes the current thread to wait no more than timeoutMs for the
specified condition to be met.
+ * It is guaranteed that the provided condition will always be invoked only
from within a
+ * synchronized block.
+ *
+ * @param waitCondition the condition to be waited upon:
+ * - if the condition returns true, then, the wait
will stop.
+ * - if the condition returns false, it means the
wait must continue until
+ * timeout.
+ *
+ * @param timeoutMs the timeout (in milli seconds)
+ *
+ * @throws TimeoutException if the condition is not met
within timeoutMs.
+ */
+ private def waitUntilConditionOrThrow(waitCondition: () => Boolean,
timeoutMs: Long): Unit = {
+ if(timeoutMs < 0L) {
+ throw new IllegalArgumentException(s"Expected timeoutMs >= 0, but
$timeoutMs was provided.")
+ }
+ val waitEndTimeNanos = System.nanoTime() + (timeoutMs * 1_000_000)
Review comment:
Why don't we just use `System.currentTimeMillis()` to avoid conversion
between nano time?
##########
File path: core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
##########
@@ -0,0 +1,550 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+import java.util.Properties
+import java.util.concurrent.ExecutionException
+
+import kafka.api.KAFKA_2_7_IV0
+import kafka.utils.TestUtils
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
+import kafka.utils.TestUtils.waitUntilTrue
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions,
FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
+import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.feature.FinalizedVersionRange
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.message.UpdateFeaturesRequestData
+import
org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{UpdateFeaturesRequest,
UpdateFeaturesResponse}
+import org.apache.kafka.common.utils.Utils
+import org.junit.Test
+import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals,
assertNotNull, assertTrue}
+import org.scalatest.Assertions.{assertThrows, intercept}
+
+import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
+import scala.util.matching.Regex
+
+class UpdateFeaturesTest extends BaseRequestTest {
+
+ override def brokerCount = 3
+
+ override def brokerPropertyOverrides(props: Properties): Unit = {
+ props.put(KafkaConfig.InterBrokerProtocolVersionProp,
KAFKA_2_7_IV0.toString)
+ }
+
+ private def defaultSupportedFeatures(): Features[SupportedVersionRange] = {
+ Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new
SupportedVersionRange(1, 3))))
+ }
+
+ private def defaultFinalizedFeatures(): Features[FinalizedVersionRange] = {
+ Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new
FinalizedVersionRange(1, 2))))
+ }
+
+ private def updateSupportedFeatures(
+ features: Features[SupportedVersionRange], targetServers:
Set[KafkaServer]): Unit = {
+ targetServers.foreach(s => {
+ s.brokerFeatures.setSupportedFeatures(features)
+ s.zkClient.updateBrokerInfo(s.createBrokerInfo)
+ })
+
+ // Wait until updates to all BrokerZNode supported features propagate to
the controller.
+ val brokerIds = targetServers.map(s => s.config.brokerId)
+ waitUntilTrue(
+ () => servers.exists(s => {
+ if (s.kafkaController.isActive) {
+ s.kafkaController.controllerContext.liveOrShuttingDownBrokers
+ .filter(b => brokerIds.contains(b.id))
+ .forall(b => {
+ b.features.equals(features)
+ })
+ } else {
+ false
+ }
+ }),
+ "Controller did not get broker updates")
+ }
+
+ private def updateSupportedFeaturesInAllBrokers(features:
Features[SupportedVersionRange]): Unit = {
+ updateSupportedFeatures(features, Set[KafkaServer]() ++ servers)
+ }
+
+ private def updateDefaultMinVersionLevelsInAllBrokers(newMinVersionLevels:
Map[String, Short]): Unit = {
+ servers.foreach(s => {
+ s.brokerFeatures.setDefaultMinVersionLevels(newMinVersionLevels)
+ })
+ }
+
+ private def updateFeatureZNode(features: Features[FinalizedVersionRange]):
Int = {
+ val server = serverForId(0).get
+ val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, features)
+ val newVersion = server.zkClient.updateFeatureZNode(newNode)
+ servers.foreach(s => {
+ s.featureCache.waitUntilEpochOrThrow(newVersion,
s.config.zkConnectionTimeoutMs)
+ })
+ newVersion
+ }
+
+ private def getFeatureZNode(): FeatureZNode = {
+ val (mayBeFeatureZNodeBytes, version) =
serverForId(0).get.zkClient.getDataAndVersion(FeatureZNode.path)
+ assertNotEquals(version, ZkVersion.UnknownVersion)
+ FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+ }
+
+ private def checkFeatures(client: Admin, expectedNode: FeatureZNode,
expectedMetadata: FeatureMetadata): Unit = {
+ assertEquals(expectedNode, getFeatureZNode())
+ val featureMetadata = client.describeFeatures(
+ new
DescribeFeaturesOptions().sendRequestToController(true)).featureMetadata().get()
+ assertEquals(expectedMetadata, featureMetadata)
+ }
+
+ private def checkException[ExceptionType <: Throwable](result:
UpdateFeaturesResult,
+
featureExceptionMsgPatterns: Map[String, Regex])
+ (implicit tag:
ClassTag[ExceptionType]): Unit = {
+ featureExceptionMsgPatterns.foreach {
+ case (feature, exceptionMsgPattern) =>
+ val exception = intercept[ExecutionException] {
+ result.values().get(feature).get()
+ }
+ val cause = exception.getCause
+ assertNotNull(cause)
+ assertEquals(cause.getClass, tag.runtimeClass)
+ assertTrue(cause.getMessage,
exceptionMsgPattern.findFirstIn(cause.getMessage).isDefined)
+ }
+
+ }
+
+ /**
+ * Tests whether an invalid feature update does not get processed on the
server as expected,
+ * and raises the ExceptionType on the client side as expected.
+ *
+ * @param invalidUpdate the invalid feature update to be sent in the
+ * updateFeatures request to the server
+ * @param exceptionMsgPattern a pattern for the expected exception message
+ */
+ private def testWithInvalidFeatureUpdate[ExceptionType <:
Throwable](feature: String,
+
invalidUpdate: FeatureUpdate,
+
exceptionMsgPattern: Regex)
+
(implicit tag: ClassTag[ExceptionType]): Unit = {
+ TestUtils.waitUntilControllerElected(zkClient)
+
+ updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures())
+ val versionBefore = updateFeatureZNode(defaultFinalizedFeatures())
+ val adminClient = createAdminClient()
+ val nodeBefore = getFeatureZNode()
+
+ val result = adminClient.updateFeatures(Utils.mkMap(Utils.mkEntry(feature,
invalidUpdate)), new UpdateFeaturesOptions())
+
+ checkException[ExceptionType](result, Map(feature -> exceptionMsgPattern))
+ checkFeatures(
+ adminClient,
+ nodeBefore,
+ new FeatureMetadata(defaultFinalizedFeatures(), versionBefore,
defaultSupportedFeatures()))
+ }
+
+ @Test
+ def testShouldFailRequestIfNotController(): Unit = {
+ TestUtils.waitUntilControllerElected(zkClient)
+
+ updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures())
+ val versionBefore = updateFeatureZNode(defaultFinalizedFeatures())
+
+ val nodeBefore = getFeatureZNode()
+ val updates = new FeatureUpdateKeyCollection()
+ val update = new UpdateFeaturesRequestData.FeatureUpdateKey();
+ update.setFeature("feature_1");
+
update.setMaxVersionLevel(defaultSupportedFeatures().get("feature_1").max())
+ update.setAllowDowngrade(false)
+ updates.add(update)
+
+ val response = connectAndReceive[UpdateFeaturesResponse](
+ new UpdateFeaturesRequest.Builder(new
UpdateFeaturesRequestData().setFeatureUpdates(updates)).build(),
+ notControllerSocketServer)
+
+ assertEquals(1, response.data.results.size)
+ val result = response.data.results.asScala.head
+ assertEquals("feature_1", result.feature)
+ assertEquals(Errors.NOT_CONTROLLER, Errors.forCode(result.errorCode))
+ assertNotNull(result.errorMessage)
+ assertFalse(result.errorMessage.isEmpty)
+ checkFeatures(
+ createAdminClient(),
+ nodeBefore,
+ new FeatureMetadata(defaultFinalizedFeatures(), versionBefore,
defaultSupportedFeatures()))
+ }
+
+ @Test
+ def testShouldFailRequestForEmptyUpdates(): Unit = {
+ val nullMap: util.Map[String, FeatureUpdate] = null
+ val emptyMap: util.Map[String, FeatureUpdate] = Utils.mkMap()
+ Set(nullMap, emptyMap).foreach { updates =>
+ val client = createAdminClient()
+ val exception = intercept[IllegalArgumentException] {
+ client.updateFeatures(updates, new UpdateFeaturesOptions())
+ }
+ assertNotNull(exception)
+ assertEquals("Feature updates can not be null or empty.",
exception.getMessage)
+ }
+ }
+
+ @Test
+ def testShouldFailRequestForNullUpdateFeaturesOptions(): Unit = {
+ val client = createAdminClient()
+ val update = new
FeatureUpdate(defaultSupportedFeatures().get("feature_1").max(), false)
+ val exception = intercept[NullPointerException] {
+ client.updateFeatures(Utils.mkMap(Utils.mkEntry("feature_1", update)),
null)
+ }
+ assertNotNull(exception)
+ assertEquals("UpdateFeaturesOptions can not be null", exception.getMessage)
+ }
+
+ @Test
+ def testShouldFailRequestForInvalidFeatureName(): Unit = {
+ val client = createAdminClient()
+ val update = new
FeatureUpdate(defaultSupportedFeatures().get("feature_1").max(), false)
+ val exception = intercept[IllegalArgumentException] {
+ client.updateFeatures(Utils.mkMap(Utils.mkEntry("", update)), new
UpdateFeaturesOptions())
+ }
+ assertNotNull(exception)
+ assertTrue((".*Provided feature can not be null or
empty.*"r).findFirstIn(exception.getMessage).isDefined)
+ }
+
+ @Test
+ def testShouldFailRequestWhenDowngradeFlagIsNotSetDuringDowngrade(): Unit = {
+ testWithInvalidFeatureUpdate[InvalidRequestException](
+ "feature_1",
+ new FeatureUpdate((defaultFinalizedFeatures().get("feature_1").max() -
1).asInstanceOf[Short],false),
+ ".*Can not downgrade finalized feature: 'feature_1'.*allowDowngrade.*".r)
+ }
+
+ @Test
+ def testShouldFailRequestWhenDowngradeToHigherVersionLevelIsAttempted():
Unit = {
+ testWithInvalidFeatureUpdate[InvalidRequestException](
+ "feature_1",
+ new FeatureUpdate(defaultSupportedFeatures().get("feature_1").max(),
true),
+ ".*finalized feature: 'feature_1'.*allowDowngrade.* provided
maxVersionLevel:3.*existing maxVersionLevel:2.*".r)
+ }
+
+ @Test
+ def testShouldFailRequestInClientWhenDowngradeFlagIsNotSetDuringDeletion():
Unit = {
+ assertThrows[IllegalArgumentException] {
+ new FeatureUpdate(0, false)
+ }
+ }
+
+ @Test
+ def testShouldFailRequestInServerWhenDowngradeFlagIsNotSetDuringDeletion():
Unit = {
+ TestUtils.waitUntilControllerElected(zkClient)
+
+ updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures())
+ val versionBefore = updateFeatureZNode(defaultFinalizedFeatures())
+
+ val adminClient = createAdminClient()
+ val nodeBefore = getFeatureZNode()
+
+ val updates
+ = new UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
+ val update = new UpdateFeaturesRequestData.FeatureUpdateKey();
+ update.setFeature("feature_1")
+ update.setMaxVersionLevel(0)
+ update.setAllowDowngrade(false)
+ updates.add(update);
+ val requestData = new UpdateFeaturesRequestData()
+ requestData.setFeatureUpdates(updates);
+
+ val response = connectAndReceive[UpdateFeaturesResponse](
+ new UpdateFeaturesRequest.Builder(new
UpdateFeaturesRequestData().setFeatureUpdates(updates)).build(),
+ controllerSocketServer)
+
+ assertEquals(1, response.data().results().size())
+ val result = response.data.results.asScala.head
+ assertEquals("feature_1", result.feature)
+ assertEquals(Errors.INVALID_REQUEST, Errors.forCode(result.errorCode))
+ assertNotNull(result.errorMessage)
+ assertFalse(result.errorMessage.isEmpty)
+ val exceptionMsgPattern = ".*Can not provide maxVersionLevel: 0 less than
1 for feature: 'feature_1'.*allowDowngrade.*".r
+ assertTrue(exceptionMsgPattern.findFirstIn(result.errorMessage).isDefined)
+ checkFeatures(
+ adminClient,
+ nodeBefore,
+ new FeatureMetadata(defaultFinalizedFeatures(), versionBefore,
defaultSupportedFeatures()))
+ }
+
+ @Test
+ def testShouldFailRequestDuringDeletionOfNonExistingFeature(): Unit = {
+ testWithInvalidFeatureUpdate[InvalidRequestException](
+ "feature_non_existing",
+ new FeatureUpdate(0, true),
+ ".*Can not delete non-existing finalized feature:
'feature_non_existing'.*".r)
+ }
+
+ @Test
+ def testShouldFailRequestWhenUpgradingToSameVersionLevel(): Unit = {
+ testWithInvalidFeatureUpdate[InvalidRequestException](
+ "feature_1",
+ new FeatureUpdate(defaultFinalizedFeatures().get("feature_1").max(),
false),
+ ".*Can not upgrade a finalized feature: 'feature_1'.*to the same
value.*".r)
+ }
+
+ @Test
+ def testShouldFailRequestWhenDowngradingBelowMinVersionLevel(): Unit = {
+ TestUtils.waitUntilControllerElected(zkClient)
+
+ updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures())
+ val minVersionLevel = 2.asInstanceOf[Short]
+ updateDefaultMinVersionLevelsInAllBrokers(Map[String, Short]("feature_1"
-> minVersionLevel))
+ val initialFinalizedFeatures = Features.finalizedFeatures(
+ Utils.mkMap(Utils.mkEntry("feature_1", new
FinalizedVersionRange(minVersionLevel, 2))))
+ val versionBefore = updateFeatureZNode(initialFinalizedFeatures)
+
+ val update = new FeatureUpdate((minVersionLevel - 1).asInstanceOf[Short],
true)
+ val adminClient = createAdminClient()
+ val nodeBefore = getFeatureZNode()
+
+ val result = adminClient.updateFeatures(
+ Utils.mkMap(Utils.mkEntry("feature_1", update)), new
UpdateFeaturesOptions())
+
+ checkException[InvalidRequestException](
+ result,
+ Map("feature_1" -> ".*Can not downgrade finalized feature: 'feature_1'
to maxVersionLevel:1.*existing minVersionLevel:2.*".r))
+ checkFeatures(
+ adminClient,
+ nodeBefore,
+ new FeatureMetadata(initialFinalizedFeatures, versionBefore,
defaultSupportedFeatures()))
+ }
+
+ @Test
+ def testShouldFailRequestDuringBrokerMaxVersionLevelIncompatibility(): Unit
= {
+ TestUtils.waitUntilControllerElected(zkClient)
+
+ val controller = servers.filter { server =>
server.kafkaController.isActive}.head
+ val nonControllerServers = servers.filter { server =>
!server.kafkaController.isActive}
+ val unsupportedBrokers = Set[KafkaServer](nonControllerServers.head)
+ val supportedBrokers = Set[KafkaServer](nonControllerServers(1),
controller)
+
+ updateSupportedFeatures(defaultSupportedFeatures(), supportedBrokers)
+
+ val validMinVersion = defaultSupportedFeatures().get("feature_1").min()
+ val unsupportedMaxVersion =
+ (defaultSupportedFeatures().get("feature_1").max() -
1).asInstanceOf[Short]
+ val badSupportedFeatures = Features.supportedFeatures(
+ Utils.mkMap(
+ Utils.mkEntry("feature_1",
+ new SupportedVersionRange(
+ validMinVersion,
+ unsupportedMaxVersion))))
+ updateSupportedFeatures(badSupportedFeatures, unsupportedBrokers)
+
+ val versionBefore = updateFeatureZNode(defaultFinalizedFeatures())
+
+ val invalidUpdate = new
FeatureUpdate(defaultSupportedFeatures().get("feature_1").max(), false)
+ val nodeBefore = getFeatureZNode()
+ val adminClient = createAdminClient()
+ val result = adminClient.updateFeatures(
+ Utils.mkMap(Utils.mkEntry("feature_1", invalidUpdate)),
+ new UpdateFeaturesOptions())
+
+ checkException[InvalidRequestException](result, Map("feature_1" -> ".*1
broker.*incompatible.*".r))
+ checkFeatures(
+ adminClient,
+ nodeBefore,
+ new FeatureMetadata(defaultFinalizedFeatures(), versionBefore,
defaultSupportedFeatures()))
+ }
+
+ @Test
+ def testSuccessfulFeatureUpgradeAndWithNoExistingFinalizedFeatures(): Unit =
{
+ TestUtils.waitUntilControllerElected(zkClient)
+
+ updateSupportedFeaturesInAllBrokers(
+ Features.supportedFeatures(
+ Utils.mkMap(
+ Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)),
+ Utils.mkEntry("feature_2", new SupportedVersionRange(2, 5)))))
+ updateDefaultMinVersionLevelsInAllBrokers(Map[String, Short]("feature_1"
-> 1, "feature_2" -> 2))
+ val versionBefore = updateFeatureZNode(Features.emptyFinalizedFeatures())
+
+ val targetFinalizedFeatures = Features.finalizedFeatures(
+ Utils.mkMap(
+ Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 3)),
+ Utils.mkEntry("feature_2", new FinalizedVersionRange(2, 3))))
+ val update1 = new
FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(), false)
+ val update2 = new
FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(), false)
+
+ val expected = new FeatureMetadata(
+ targetFinalizedFeatures,
+ versionBefore + 1,
+ Features.supportedFeatures(
+ Utils.mkMap(
+ Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)),
+ Utils.mkEntry("feature_2", new SupportedVersionRange(2, 5)))))
+
+ val adminClient = createAdminClient()
+ adminClient.updateFeatures(
+ Utils.mkMap(Utils.mkEntry("feature_1", update1),
Utils.mkEntry("feature_2", update2)),
+ new UpdateFeaturesOptions()
+ ).all().get()
+
+ checkFeatures(
+ adminClient,
+ new FeatureZNode(FeatureZNodeStatus.Enabled, targetFinalizedFeatures),
+ expected)
+ }
+
+ @Test
+ def testSuccessfulFeatureUpgradeAndDowngrade(): Unit = {
+ TestUtils.waitUntilControllerElected(zkClient)
+
+ updateSupportedFeaturesInAllBrokers(
+ Features.supportedFeatures(
+ Utils.mkMap(
+ Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)),
+ Utils.mkEntry("feature_2", new SupportedVersionRange(2, 5)))))
+ updateDefaultMinVersionLevelsInAllBrokers(Map[String, Short]("feature_1"
-> 1, "feature_2" -> 2))
+ val versionBefore = updateFeatureZNode(
+ Features.finalizedFeatures(
+ Utils.mkMap(
+ Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 2)),
+ Utils.mkEntry("feature_2", new FinalizedVersionRange(2, 4)))))
+
+ val targetFinalizedFeatures = Features.finalizedFeatures(
+ Utils.mkMap(
+ Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 3)),
+ Utils.mkEntry("feature_2", new FinalizedVersionRange(2, 3))))
+ val update1 = new
FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(), false)
+ val update2 = new
FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(), true)
+
+ val expected = new FeatureMetadata(
+ targetFinalizedFeatures,
+ versionBefore + 1,
+ Features.supportedFeatures(
+ Utils.mkMap(
+ Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)),
+ Utils.mkEntry("feature_2", new SupportedVersionRange(2, 5)))))
+
+ val adminClient = createAdminClient()
+ adminClient.updateFeatures(
+ Utils.mkMap(Utils.mkEntry("feature_1", update1),
Utils.mkEntry("feature_2", update2)),
+ new UpdateFeaturesOptions()
+ ).all().get()
+
+ checkFeatures(
+ adminClient,
+ new FeatureZNode(FeatureZNodeStatus.Enabled, targetFinalizedFeatures),
+ expected)
+ }
+
+ @Test
+ def testPartialSuccessDuringValidFeatureUpgradeAndInvalidDowngrade(): Unit =
{
+ TestUtils.waitUntilControllerElected(zkClient)
+
+ val initialSupportedFeatures = Features.supportedFeatures(
Review comment:
nit: this could be extracted as a common struct.
##########
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "apiKey": 50,
+ "type": "request",
+ "name": "UpdateFeaturesRequest",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
+ "fields": [
+ { "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+",
+ "about": "The list of updates to finalized features.", "fields": [
+ {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
+ "about": "The name of the finalized feature to be updated."},
+ {"name": "MaxVersionLevel", "type": "int16", "versions": "0+",
Review comment:
Same here
##########
File path:
clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.FinalizedVersionRange;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is
particularly useful
+ * to hold the result returned by the {@link
Admin#describeFeatures(DescribeFeaturesOptions)} API.
+ */
+public class FeatureMetadata {
+
+ private final Features<FinalizedVersionRange> finalizedFeatures;
+
+ private final Optional<Integer> finalizedFeaturesEpoch;
+
+ private final Features<SupportedVersionRange> supportedFeatures;
+
+ public FeatureMetadata(
Review comment:
Try to put first parameter on the same line as the constructor, and
align the rest parameters.
##########
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "apiKey": 50,
+ "type": "request",
+ "name": "UpdateFeaturesRequest",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
+ "fields": [
+ { "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+",
+ "about": "The list of updates to finalized features.", "fields": [
+ {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
Review comment:
Space
##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1215,71 @@ default AlterClientQuotasResult
alterClientQuotas(Collection<ClientQuotaAlterati
*/
AlterClientQuotasResult
alterClientQuotas(Collection<ClientQuotaAlteration> entries,
AlterClientQuotasOptions options);
+ /**
+ * Describes finalized as well as supported features. By default, the
request is issued to any
+ * broker. It can be optionally directed only to the controller via
DescribeFeaturesOptions
+ * parameter. This is particularly useful if the user requires strongly
consistent reads of
+ * finalized features.
+ * <p>
+ * The following exceptions can be anticipated when calling {@code get()}
on the future from the
+ * returned {@link DescribeFeaturesResult}:
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * If the request timed out before the describe operation could
finish.</li>
+ * </ul>
+ * <p>
+ * @param options the options to use
+ *
+ * @return the {@link DescribeFeaturesResult} containing the
result
+ */
+ DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+ /**
+ * Applies specified updates to finalized features. This operation is not
transactional so it
+ * may succeed for some features while fail for others.
+ * <p>
+ * The API takes in a map of finalized feature name to {@link
FeatureUpdate} that needs to be
+ * applied. Each entry in the map specifies the finalized feature to be
added or updated or
+ * deleted, along with the new max feature version level value. This
request is issued only to
+ * the controller since the API is only served by the controller. The
return value contains an
+ * error code for each supplied {@link FeatureUpdate}, and the code
indicates if the update
+ * succeeded or failed in the controller.
+ * <ul>
+ * <li>Downgrade of feature version level is not a regular
operation/intent. It is only allowed
+ * in the controller if the {@link FeatureUpdate} has the allowDowngrade
flag set - setting this
+ * flag conveys user intent to attempt downgrade of a feature max version
level. Note that
+ * despite the allowDowngrade flag being set, certain downgrades may be
rejected by the
+ * controller if it is deemed impossible.</li>
+ * <li>Deletion of a finalized feature version is not a regular
operation/intent. It could be
+ * done by setting the allowDowngrade flag to true in the {@link
FeatureUpdate}, and, setting
+ * the max version level to be less than 1.</li>
+ * </ul>
+ *<p>
+ * The following exceptions can be anticipated when calling {@code get()}
on the futures
+ * obtained from the returned {@link UpdateFeaturesResult}:
+ * <ul>
+ * <li>{@link
org.apache.kafka.common.errors.ClusterAuthorizationException}
+ * If the authenticated user didn't have alter access to the
cluster.</li>
+ * <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
Review comment:
should this a per feature error or a top level error?
##########
File path: clients/src/main/resources/common/message/UpdateFeaturesResponse.json
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "apiKey": 50,
+ "type": "response",
+ "name": "UpdateFeaturesResponse",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
+ "fields": [
Review comment:
For top level exception such as cluster authorization exception, we
could just define a top level error code instead of check-marking every feature
with the redundant error code. I know we have been a bit inconsistent in such a
case, but personally feel having layered error codes could make the response
handling clear of whether it is per feature issue, or a high level issue.
##########
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "apiKey": 50,
+ "type": "request",
+ "name": "UpdateFeaturesRequest",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
+ "fields": [
+ { "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+",
+ "about": "The list of updates to finalized features.", "fields": [
+ {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
+ "about": "The name of the finalized feature to be updated."},
+ {"name": "MaxVersionLevel", "type": "int16", "versions": "0+",
+ "about": "The new maximum version level for the finalized feature. A
value >= 1 is valid. A value < 1, is special, and can be used to request the
deletion of the finalized feature."},
+ {"name": "AllowDowngrade", "type": "bool", "versions": "0+",
Review comment:
Same here
##########
File path:
clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)}
+ *
+ * The API of this class is evolving. See {@link Admin} for details.
+ */
[email protected]
+public class DescribeFeaturesOptions extends
AbstractOptions<DescribeFeaturesOptions> {
+
+ /**
+ * - True means the {@link
Admin#describeFeatures(DescribeFeaturesOptions)} request can be
+ * issued only to the controller.
+ * - False means the {@link
Admin#describeFeatures(DescribeFeaturesOptions)} request can be
Review comment:
`could be processed by any random broker`
##########
File path:
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4052,6 +4058,128 @@ void handleFailure(Throwable throwable) {
return new
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
}
+ @Override
+ public DescribeFeaturesResult describeFeatures(final
DescribeFeaturesOptions options) {
+ final KafkaFutureImpl<FeatureMetadata> future = new
KafkaFutureImpl<>();
+ final long now = time.milliseconds();
+ final NodeProvider provider =
+ options.sendRequestToController() ? new ControllerNodeProvider() :
new LeastLoadedNodeProvider();
+
+ Call call = new Call(
+ "describeFeatures", calcDeadlineMs(now, options.timeoutMs()),
provider) {
+
+ @Override
+ ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+ return new ApiVersionsRequest.Builder();
+ }
+
+ @Override
+ void handleResponse(AbstractResponse response) {
+ final ApiVersionsResponse apiVersionsResponse =
(ApiVersionsResponse) response;
+ if (apiVersionsResponse.data.errorCode() ==
Errors.NONE.code()) {
+ future.complete(
+ new FeatureMetadata(
+ apiVersionsResponse.finalizedFeatures(),
+ apiVersionsResponse.finalizedFeaturesEpoch(),
+ apiVersionsResponse.supportedFeatures()));
+ } else if (options.sendRequestToController() &&
apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) {
+ handleNotControllerError(Errors.NOT_CONTROLLER);
+ } else {
+ future.completeExceptionally(
+
Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+ }
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ completeAllExceptionally(Collections.singletonList(future),
throwable);
+ }
+ };
+
+ runnable.call(call, now);
+ return new DescribeFeaturesResult(future);
+ }
+
+ @Override
+ public UpdateFeaturesResult updateFeatures(
+ final Map<String, FeatureUpdate> featureUpdates, final
UpdateFeaturesOptions options) {
+ if (featureUpdates == null || featureUpdates.isEmpty()) {
+ throw new IllegalArgumentException("Feature updates can not be
null or empty.");
+ }
+ Objects.requireNonNull(options, "UpdateFeaturesOptions can not be
null");
+
+ final Map<String, KafkaFutureImpl<Void>> updateFutures = new
HashMap<>();
+ final UpdateFeaturesRequestData.FeatureUpdateKeyCollection
featureUpdatesRequestData
+ = new UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
+ for (Map.Entry<String, FeatureUpdate> entry :
featureUpdates.entrySet()) {
+ final String feature = entry.getKey();
+ final FeatureUpdate update = entry.getValue();
+ if (feature.trim().isEmpty()) {
+ throw new IllegalArgumentException("Provided feature can not
be null or empty.");
+ }
+
+ updateFutures.put(feature, new KafkaFutureImpl<>());
+ final UpdateFeaturesRequestData.FeatureUpdateKey requestItem =
+ new UpdateFeaturesRequestData.FeatureUpdateKey();
+ requestItem.setFeature(feature);
+ requestItem.setMaxVersionLevel(update.maxVersionLevel());
+ requestItem.setAllowDowngrade(update.allowDowngrade());
+ featureUpdatesRequestData.add(requestItem);
+ }
+ final UpdateFeaturesRequestData request = new
UpdateFeaturesRequestData().setFeatureUpdates(featureUpdatesRequestData);
+
+ final long now = time.milliseconds();
+ final Call call = new Call("updateFeatures", calcDeadlineMs(now,
options.timeoutMs()),
+ new ControllerNodeProvider()) {
+
+ @Override
+ UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
+ return new UpdateFeaturesRequest.Builder(request);
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ final UpdateFeaturesResponse response =
+ (UpdateFeaturesResponse) abstractResponse;
+
+ // Check for controller change.
+ for (UpdatableFeatureResult result :
response.data().results()) {
+ final Errors error = Errors.forCode(result.errorCode());
+ if (error == Errors.NOT_CONTROLLER) {
+ handleNotControllerError(error);
+ throw error.exception();
+ }
+ }
+
+ for (UpdatableFeatureResult result :
response.data().results()) {
+ final KafkaFutureImpl<Void> future =
updateFutures.get(result.feature());
+ if (future == null) {
Review comment:
Does this overlap with `completeUnrealizedFutures` check? We could just
keep one to reduce the checking complexity.
##########
File path:
clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.FinalizedVersionRange;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is
particularly useful
+ * to hold the result returned by the {@link
Admin#describeFeatures(DescribeFeaturesOptions)} API.
+ */
+public class FeatureMetadata {
+
+ private final Features<FinalizedVersionRange> finalizedFeatures;
+
+ private final Optional<Integer> finalizedFeaturesEpoch;
+
+ private final Features<SupportedVersionRange> supportedFeatures;
+
+ public FeatureMetadata(
+ final Features<FinalizedVersionRange> finalizedFeatures,
+ final int finalizedFeaturesEpoch,
+ final Features<SupportedVersionRange> supportedFeatures) {
+ Objects.requireNonNull(finalizedFeatures, "Provided finalizedFeatures
can not be null.");
+ Objects.requireNonNull(supportedFeatures, "Provided supportedFeatures
can not be null.");
+ this.finalizedFeatures = finalizedFeatures;
+ if (finalizedFeaturesEpoch >= 0) {
+ this.finalizedFeaturesEpoch = Optional.of(finalizedFeaturesEpoch);
+ } else {
+ this.finalizedFeaturesEpoch = Optional.empty();
+ }
+ this.supportedFeatures = supportedFeatures;
+ }
+
+ /**
+ * A map of finalized feature versions, with key being finalized feature
name and value
+ * containing the min/max version levels for the finalized feature.
+ */
+ public Features<FinalizedVersionRange> finalizedFeatures() {
+ return finalizedFeatures;
+ }
+
+ /**
+ * The epoch for the finalized features.
+ * If the returned value is empty, it means the finalized features are
absent/unavailable.
+ */
+ public Optional<Integer> finalizedFeaturesEpoch() {
+ return finalizedFeaturesEpoch;
+ }
+
+ /**
+ * A map of supported feature versions, with key being supported feature
name and value
+ * containing the min/max version for the supported feature.
+ */
+ public Features<SupportedVersionRange> supportedFeatures() {
+ return supportedFeatures;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!(other instanceof FeatureMetadata)) {
+ return false;
+ }
+
+ final FeatureMetadata that = (FeatureMetadata) other;
+ return Objects.equals(this.finalizedFeatures, that.finalizedFeatures)
&&
+ Objects.equals(this.finalizedFeaturesEpoch,
that.finalizedFeaturesEpoch) &&
+ Objects.equals(this.supportedFeatures, that.supportedFeatures);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(finalizedFeatures, finalizedFeaturesEpoch,
supportedFeatures);
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "FeatureMetadata{finalized:%s, finalizedFeaturesEpoch:%d,
supported:%s}",
+ finalizedFeatures,
+ finalizedFeaturesEpoch,
Review comment:
This won't work well with string format, consider doing `orElse`
##########
File path:
clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)}
+ *
+ * The API of this class is evolving. See {@link Admin} for details.
+ */
[email protected]
+public class DescribeFeaturesOptions extends
AbstractOptions<DescribeFeaturesOptions> {
+
+ /**
+ * - True means the {@link
Admin#describeFeatures(DescribeFeaturesOptions)} request can be
+ * issued only to the controller.
+ * - False means the {@link
Admin#describeFeatures(DescribeFeaturesOptions)} request can be
+ * issued to any random broker.
+ */
+ private boolean sendRequestToController = false;
+
+ /**
+ * Sets a flag indicating that the describe features request should be
issued to the controller.
Review comment:
Same here
##########
File path:
clients/src/main/java/org/apache/kafka/clients/admin/FeatureUpdate.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Encapsulates details about an update to a finalized feature. This is
particularly useful to
+ * define each feature update in the {@link Admin#updateFeatures(Map,
UpdateFeaturesOptions)} API.
+ */
+public class FeatureUpdate {
+ private final short maxVersionLevel;
+ private final boolean allowDowngrade;
+
+ /**
+ * @param maxVersionLevel the new maximum version level for the
finalized feature.
+ * a value < 1 is special and indicates that the
update is intended to
+ * delete the finalized feature, and should be
accompanied by setting
+ * the allowDowngrade flag to true.
+ * @param allowDowngrade - true, if this feature update was meant to
downgrade the existing
+ * maximum version level of the finalized
feature.
+ * - false, otherwise.
+ */
+ public FeatureUpdate(final short maxVersionLevel, final boolean
allowDowngrade) {
+ if (maxVersionLevel < 1 && !allowDowngrade) {
+ throw new IllegalArgumentException(String.format(
+ "The allowDowngrade flag should be set when the provided
maxVersionLevel:%d is < 1.",
+ maxVersionLevel));
+ }
+ this.maxVersionLevel = maxVersionLevel;
+ this.allowDowngrade = allowDowngrade;
+ }
+
+ public short maxVersionLevel() {
+ return maxVersionLevel;
+ }
+
+ public boolean allowDowngrade() {
+ return allowDowngrade;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (!(other instanceof FeatureUpdate)) {
+ return false;
+ }
+
+ final FeatureUpdate that = (FeatureUpdate) other;
+ return this.maxVersionLevel == that.maxVersionLevel &&
this.allowDowngrade == that.allowDowngrade;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(maxVersionLevel, allowDowngrade);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("FeatureUpdate{maxVersionLevel:%d,
allowDowngrade:%s}", maxVersionLevel, allowDowngrade);
+ }
+}
Review comment:
new line
##########
File path:
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -3193,6 +3238,104 @@ public void testListOffsetsNonRetriableErrors() throws
Exception {
}
}
+ @Test
+ public void testUpdateFeaturesDuringSuccess() throws Exception {
+ testUpdateFeaturesDuringError(Errors.NONE);
+ }
+
+ @Test
+ public void testUpdateFeaturesInvalidRequestError() throws Exception {
+ testUpdateFeaturesDuringError(Errors.INVALID_REQUEST);
+ }
+
+ @Test
+ public void testUpdateFeaturesUpdateFailedError() throws Exception {
+ testUpdateFeaturesDuringError(Errors.FEATURE_UPDATE_FAILED);
+ }
+
+ private void testUpdateFeaturesDuringError(Errors error) throws Exception {
+ try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().prepareResponse(
+ body -> body instanceof UpdateFeaturesRequest,
+ prepareUpdateFeaturesResponse(error));
+ final KafkaFuture<Void> future = env.adminClient().updateFeatures(
+ new HashSet<>(
+ Arrays.asList(
+ new FeatureUpdate(
+ "test_feature_1", (short) 2, false),
+ new FeatureUpdate(
+ "test_feature_2", (short) 3, true))),
+ new UpdateFeaturesOptions().timeoutMs(10000)).result();
+ if (error.exception() == null) {
+ future.get();
+ } else {
+ final ExecutionException e =
assertThrows(ExecutionException.class,
+ () -> future.get());
+ assertEquals(e.getCause().getClass(),
error.exception().getClass());
+ }
+ }
+ }
+
+ @Test
+ public void testUpdateFeaturesHandleNotControllerException() throws
Exception {
+ try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().prepareResponseFrom(
+ prepareUpdateFeaturesResponse(Errors.NOT_CONTROLLER),
+ env.cluster().nodeById(0));
+
env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(env.cluster().nodes(),
+ env.cluster().clusterResource().clusterId(),
+ 1,
+ Collections.<MetadataResponse.TopicMetadata>emptyList()));
+ env.kafkaClient().prepareResponseFrom(
Review comment:
You are right, it seems not necessary.
##########
File path:
clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.message.UpdateFeaturesResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+
+/**
+ * Possible error codes:
+ *
+ * - {@link Errors#CLUSTER_AUTHORIZATION_FAILED}
+ * - {@link Errors#NOT_CONTROLLER}
+ * - {@link Errors#INVALID_REQUEST}
+ * - {@link Errors#FEATURE_UPDATE_FAILED}
+ */
+public class UpdateFeaturesResponse extends AbstractResponse {
+
+ private final UpdateFeaturesResponseData data;
+
+ public UpdateFeaturesResponse(UpdateFeaturesResponseData data) {
+ this.data = data;
+ }
+
+ public UpdateFeaturesResponse(Struct struct) {
+ final short latestVersion = (short)
(UpdateFeaturesResponseData.SCHEMAS.length - 1);
+ this.data = new UpdateFeaturesResponseData(struct, latestVersion);
+ }
+
+ public UpdateFeaturesResponse(Struct struct, short version) {
+ this.data = new UpdateFeaturesResponseData(struct, version);
+ }
+
+ public Map<String, ApiError> errors() {
+ return data.results().valuesSet().stream().collect(
+ Collectors.toMap(
+ result -> result.feature(),
Review comment:
nit: could be replaced with lambda
##########
File path:
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##########
@@ -143,7 +172,13 @@ public static ApiVersionsResponse apiVersionsResponse(
Features<FinalizedVersionRange> finalizedFeatures,
int finalizedFeaturesEpoch) {
if (maxMagic == RecordBatch.CURRENT_MAGIC_VALUE && throttleTimeMs ==
DEFAULT_THROTTLE_TIME) {
- return DEFAULT_API_VERSIONS_RESPONSE;
+ return new ApiVersionsResponse(createApiVersionsResponseData(
+ DEFAULT_API_VERSIONS_RESPONSE.throttleTimeMs(),
+
Errors.forCode(DEFAULT_API_VERSIONS_RESPONSE.data().errorCode()),
+ DEFAULT_API_VERSIONS_RESPONSE.data().apiKeys(),
+ latestSupportedFeatures,
+ finalizedFeatures,
+ finalizedFeaturesEpoch));
Review comment:
Comment here since no better place: createApiVersionsResponse on L198
could be made private
##########
File path:
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -3615,6 +3662,137 @@ public void testListOffsetsNonRetriableErrors() throws
Exception {
}
}
+ @Test
+ public void testUpdateFeaturesDuringSuccess() throws Exception {
+ testUpdateFeatures(
+ makeTestFeatureUpdates(),
+ makeTestFeatureUpdateErrors(Errors.NONE));
+ }
+
+ @Test
+ public void testUpdateFeaturesInvalidRequestError() throws Exception {
+ testUpdateFeatures(
+ makeTestFeatureUpdates(),
+ makeTestFeatureUpdateErrors(Errors.INVALID_REQUEST));
+ }
+
+ @Test
+ public void testUpdateFeaturesUpdateFailedError() throws Exception {
+ testUpdateFeatures(
+ makeTestFeatureUpdates(),
+ makeTestFeatureUpdateErrors(Errors.FEATURE_UPDATE_FAILED));
+ }
+
+ @Test
+ public void testUpdateFeaturesPartialSuccess() throws Exception {
+ final Map<String, Errors> errors =
makeTestFeatureUpdateErrors(Errors.NONE);
+ errors.put("test_feature_2", Errors.INVALID_REQUEST);
+ testUpdateFeatures(makeTestFeatureUpdates(), errors);
+ }
+
+ private Map<String, FeatureUpdate> makeTestFeatureUpdates() {
+ return Utils.mkMap(
+ Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2,
false)),
+ Utils.mkEntry("test_feature_2", new FeatureUpdate((short) 3,
true)));
+ }
+
+ private Map<String, Errors> makeTestFeatureUpdateErrors(final Errors
error) {
+ final Map<String, FeatureUpdate> updates = makeTestFeatureUpdates();
+ final Map<String, Errors> errors = new HashMap<>();
+ for (Map.Entry<String, FeatureUpdate> entry : updates.entrySet()) {
+ errors.put(entry.getKey(), error);
+ }
+ return errors;
+ }
+
+ private void testUpdateFeatures(Map<String, FeatureUpdate> featureUpdates,
+ Map<String, Errors> featureUpdateErrors)
throws Exception {
+ try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().prepareResponse(
+ body -> body instanceof UpdateFeaturesRequest,
+ prepareUpdateFeaturesResponse(featureUpdateErrors));
+ final Map<String, KafkaFuture<Void>> futures =
env.adminClient().updateFeatures(
+ featureUpdates,
+ new UpdateFeaturesOptions().timeoutMs(10000)).values();
+ for (Map.Entry<String, KafkaFuture<Void>> entry :
futures.entrySet()) {
+ final KafkaFuture<Void> future = entry.getValue();
+ final Errors error = featureUpdateErrors.get(entry.getKey());
+ if (error == Errors.NONE) {
+ future.get();
+ } else {
+ final ExecutionException e =
assertThrows(ExecutionException.class,
+ () -> future.get());
Review comment:
nit: could use lambda
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -266,6 +275,178 @@ class KafkaController(val config: KafkaConfig,
}
}
+ private def createFeatureZNode(newNode: FeatureZNode): Int = {
+ info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents:
$newNode")
+ zkClient.createFeatureZNode(newNode)
+ val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+ newVersion
+ }
+
+ private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+ info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents:
$updatedNode")
+ zkClient.updateFeatureZNode(updatedNode)
+ }
+
+ /**
+ * This method enables the feature versioning system (KIP-584).
+ *
+ * Development in Kafka (from a high level) is organized into features. Each
feature is tracked by
+ * a name and a range of version numbers. A feature can be of two types:
+ *
+ * 1. Supported feature:
+ * A supported feature is represented by a name (String) and a range of
versions (defined by a
+ * {@link SupportedVersionRange}). It refers to a feature that a particular
broker advertises
+ * support for. Each broker advertises the version ranges of its own
supported features in its
+ * own BrokerIdZNode. The contents of the advertisement are specific to the
particular broker and
+ * do not represent any guarantee of a cluster-wide availability of the
feature for any particular
+ * range of versions.
+ *
+ * 2. Finalized feature:
+ * A finalized feature is represented by a name (String) and a range of
version levels (defined
+ * by a {@link FinalizedVersionRange}). Whenever the feature versioning
system (KIP-584) is
+ * enabled, the finalized features are stored in the cluster-wide common
FeatureZNode.
+ * In comparison to a supported feature, the key difference is that a
finalized feature exists
+ * in ZK only when it is guaranteed to be supported by any random broker in
the cluster for a
+ * specified range of version levels. Also, the controller is the only
entity modifying the
+ * information about finalized features.
+ *
+ * This method sets up the FeatureZNode with enabled status, which means
that the finalized
+ * features stored in the FeatureZNode are active. The enabled status should
be written by the
+ * controller to the FeatureZNode only when the broker IBP config is greater
than or equal to
+ * KAFKA_2_7_IV0.
+ *
+ * There are multiple cases handled here:
+ *
+ * 1. New cluster bootstrap:
+ * A new Kafka cluster (i.e. it is deployed first time) is almost always
started with IBP config
+ * setting greater than or equal to KAFKA_2_7_IV0. We would like to start
the cluster with all
+ * the possible supported features finalized immediately. Assuming this
is the case, the
+ * controller will start up and notice that the FeatureZNode is absent in
the new cluster,
+ * it will then create a FeatureZNode (with enabled status) containing
the entire list of
+ * default supported features as its finalized features.
+ *
+ * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+ * Imagine there is an existing Kafka cluster with IBP config less than
KAFKA_2_7_IV0, and the
+ * broker binary has been upgraded to a newer version that supports the
feature versioning
+ * system (KIP-584). This means the user is upgrading from an earlier
version of the broker
+ * binary. In this case, we want to start with no finalized features and
allow the user to
+ * finalize them whenever they are ready i.e. in the future whenever the
user sets IBP config
+ * to be greater than or equal to KAFKA_2_7_IV0, then the user could
start finalizing the
+ * features. This process ensures we do not enable all the possible
features immediately after
+ * an upgrade, which could be harmful to Kafka.
+ * This is how we handle such a case:
+ * - Before the IBP config upgrade (i.e. IBP config set to less than
KAFKA_2_7_IV0), the
+ * controller will start up and check if the FeatureZNode is absent.
If absent, it will
+ * react by creating a FeatureZNode with disabled status and empty
finalized features.
+ * Otherwise, if a node already exists in enabled status then the
controller will just
+ * flip the status to disabled and clear the finalized features.
+ * - After the IBP config upgrade (i.e. IBP config set to greater than
or equal to
+ * KAFKA_2_7_IV0), when the controller starts up it will check if the
FeatureZNode exists
+ * and whether it is disabled. In such a case, it won’t upgrade all
features immediately.
+ * Instead it will just switch the FeatureZNode status to enabled
status. This lets the
+ * user finalize the features later.
+ *
+ * 3. Broker binary upgraded, with existing cluster IBP config >=
KAFKA_2_7_IV0:
+ * Imagine an existing Kafka cluster with IBP config >= KAFKA_2_7_IV0,
and the broker binary
+ * has just been upgraded to a newer version (that supports IBP config
KAFKA_2_7_IV0 and higher).
+ * The controller will start up and find that a FeatureZNode is already
present with enabled
+ * status and existing finalized features. In such a case, the controller
needs to scan the
+ * existing finalized features and mutate them for the purpose of version
level deprecation
+ * (if needed).
+ * This is how we handle this case: If an existing finalized feature is
present in the default
+ * finalized features, then, its existing minimum version level is
updated to the default
+ * minimum version level maintained in the BrokerFeatures object. The
goal of this mutation is
+ * to permanently deprecate one or more feature version levels. The range
of feature version
+ * levels deprecated are from the closed range:
[existing_min_version_level, default_min_version_level].
+ * NOTE: Deprecating a feature version level is an incompatible change,
which requires a major
+ * release of Kafka. In such a release, the minimum version level
maintained within the
+ * BrokerFeatures class is updated suitably to record the deprecation of
the feature.
+ *
+ * 4. Broker downgrade:
+ * Imagine that a Kafka cluster exists already and the IBP config is
greater than or equal to
+ * KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by
setting IBP config to a
+ * value less than KAFKA_2_7_IV0. This means the user is also disabling
the feature versioning
+ * system (KIP-584). In this case, when the controller starts up with the
lower IBP config, it
+ * will switch the FeatureZNode status to disabled with empty features.
+ */
+ private def enableFeatureVersioning(): Unit = {
+ val defaultFinalizedFeatures = brokerFeatures.getDefaultFinalizedFeatures
+ val (mayBeFeatureZNodeBytes, version) =
zkClient.getDataAndVersion(FeatureZNode.path)
+ if (version == ZkVersion.UnknownVersion) {
+ val newVersion = createFeatureZNode(new
FeatureZNode(FeatureZNodeStatus.Enabled, defaultFinalizedFeatures))
+ featureCache.waitUntilEpochOrThrow(newVersion,
config.zkConnectionTimeoutMs)
+ } else {
+ val existingFeatureZNode =
FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+ var newFeatures: Features[FinalizedVersionRange] =
Features.emptyFinalizedFeatures()
+ if (existingFeatureZNode.status.equals(FeatureZNodeStatus.Enabled)) {
+ newFeatures =
Features.finalizedFeatures(existingFeatureZNode.features.features().asScala.map
{
+ case (featureName, existingVersionRange) =>
+ val brokerDefaultVersionRange =
defaultFinalizedFeatures.get(featureName)
+ if (brokerDefaultVersionRange == null) {
+ warn(s"Existing finalized feature: $featureName with
$existingVersionRange"
+ + s" is absent in default finalized $defaultFinalizedFeatures")
+ (featureName, existingVersionRange)
+ } else if (brokerDefaultVersionRange.max() >=
existingVersionRange.max() &&
+ brokerDefaultVersionRange.min() <=
existingVersionRange.max()) {
+ // Through this change, we deprecate all version levels in the
closed range:
+ // [existingVersionRange.min(), brokerDefaultVersionRange.min()
- 1]
+ (featureName, new
FinalizedVersionRange(brokerDefaultVersionRange.min(),
existingVersionRange.max()))
+ } else {
+ // If the existing version levels fall completely outside the
Review comment:
Are we good to proceed in this case? When there is no overlapping
between broker default features and remote finalized features, is the current
controller still eligible?
##########
File path:
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -3615,6 +3662,137 @@ public void testListOffsetsNonRetriableErrors() throws
Exception {
}
}
+ @Test
+ public void testUpdateFeaturesDuringSuccess() throws Exception {
+ testUpdateFeatures(
+ makeTestFeatureUpdates(),
+ makeTestFeatureUpdateErrors(Errors.NONE));
+ }
+
+ @Test
+ public void testUpdateFeaturesInvalidRequestError() throws Exception {
+ testUpdateFeatures(
+ makeTestFeatureUpdates(),
+ makeTestFeatureUpdateErrors(Errors.INVALID_REQUEST));
+ }
+
+ @Test
+ public void testUpdateFeaturesUpdateFailedError() throws Exception {
+ testUpdateFeatures(
+ makeTestFeatureUpdates(),
+ makeTestFeatureUpdateErrors(Errors.FEATURE_UPDATE_FAILED));
+ }
+
+ @Test
+ public void testUpdateFeaturesPartialSuccess() throws Exception {
+ final Map<String, Errors> errors =
makeTestFeatureUpdateErrors(Errors.NONE);
+ errors.put("test_feature_2", Errors.INVALID_REQUEST);
+ testUpdateFeatures(makeTestFeatureUpdates(), errors);
+ }
+
+ private Map<String, FeatureUpdate> makeTestFeatureUpdates() {
+ return Utils.mkMap(
+ Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2,
false)),
+ Utils.mkEntry("test_feature_2", new FeatureUpdate((short) 3,
true)));
+ }
+
+ private Map<String, Errors> makeTestFeatureUpdateErrors(final Errors
error) {
+ final Map<String, FeatureUpdate> updates = makeTestFeatureUpdates();
Review comment:
Could we make `updates` as a pass-in parameter to avoid calling
`makeTestFeatureUpdates` twice?
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2956,6 +2959,37 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
+ def handleUpdateFeatures(request: RequestChannel.Request): Unit = {
+ val updateFeaturesRequest = request.body[UpdateFeaturesRequest]
+ def featureUpdateErrors(error: Errors, msgOverride: Option[String]):
Map[String, ApiError] = {
+ updateFeaturesRequest.data().featureUpdates().asScala.map(
+ update => update.feature() -> new ApiError(error,
msgOverride.getOrElse(error.message()))
+ ).toMap
+ }
+
+ def sendResponseCallback(updateErrors: Map[String, ApiError]): Unit = {
+ val results = new UpdatableFeatureResultCollection()
Review comment:
Could be moved to `UpdateFeaturesResponse` as a utility.
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -266,6 +275,179 @@ class KafkaController(val config: KafkaConfig,
}
}
+ private def createFeatureZNode(newNode: FeatureZNode): Int = {
+ info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents:
$newNode")
+ zkClient.createFeatureZNode(newNode)
+ val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+ newVersion
+ }
+
+ private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+ info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents:
$updatedNode")
+ zkClient.updateFeatureZNode(updatedNode)
+ }
+
+ /**
+ * This method enables the feature versioning system (KIP-584).
+ *
+ * Development in Kafka (from a high level) is organized into features. Each
feature is tracked by
+ * a name and a range of version numbers. A feature can be of two types:
+ *
+ * 1. Supported feature:
+ * A supported feature is represented by a name (String) and a range of
versions (defined by a
+ * {@link SupportedVersionRange}). It refers to a feature that a particular
broker advertises
+ * support for. Each broker advertises the version ranges of it’s own
supported features in its
+ * own BrokerIdZnode. The contents of the advertisement are specific to the
particular broker and
+ * do not represent any guarantee of a cluster-wide availability of the
feature for any particular
+ * range of versions.
+ *
+ * 2. Finalized feature:
+ * A finalized feature is is represented by a name (String) and a range of
version levels (defined
+ * by a {@link FinalizedVersionRange}). Whenever the feature versioning
system (KIP-584) is
+ * enabled, the finalized features are stored in ZK in the cluster-wide
common FeatureZNode.
+ * In comparison to a supported feature, the key difference is that a
finalized feature exists
+ * in ZK only when it is guaranteed to be supported by any random broker in
the cluster for a
+ * specified range of version levels. Also, the controller is the one and
only entity modifying
+ * the information about finalized features and their version levels.
+ *
+ * This method sets up the FeatureZNode with enabled status. This status
means the feature
+ * versioning system (KIP-584) is enabled, and, the finalized features
stored in the FeatureZNode
+ * are active. This status should be written by the controller to the
FeatureZNode only when the
+ * broker IBP config is greater than or equal to KAFKA_2_7_IV0.
+ *
+ * There are multiple cases handled here:
+ *
+ * 1. New cluster bootstrap:
+ * A new Kafka cluster (i.e. it is deployed first time) is almost always
started with IBP config
+ * setting greater than or equal to KAFKA_2_7_IV0. We would like to start
the cluster with all
+ * the possible supported features finalized immediately. Assuming this
is the case, the
+ * controller will start up and notice that the FeatureZNode is absent in
the new cluster,
+ * it will then create a FeatureZNode (with enabled status) containing
the entire list of
+ * default supported features as its finalized features.
+ *
+ * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+ * Imagine there is an existing Kafka cluster with IBP config less than
KAFKA_2_7_IV0, and the
+ * Broker binary has been upgraded to a newer version that supports the
feature versioning
+ * system (KIP-584). This means the user is upgrading from an earlier
version of the Broker
+ * binary. In this case, we want to start with no finalized features and
allow the user to
+ * finalize them whenever they are ready i.e. in the future whenever the
user sets IBP config
+ * to be greater than or equal to KAFKA_2_7_IV0, then the user could
start finalizing the
+ * features. The reason to do this is that enabling all the possible
features immediately after
+ * an upgrade could be harmful to the cluster.
+ * This is how we handle such a case:
+ * - Before the IBP config upgrade (i.e. IBP config set to less than
KAFKA_2_7_IV0), the
+ * controller will start up and check if the FeatureZNode is absent.
If absent, then it
+ * will react by creating a FeatureZNode with disabled status and
empty finalized features.
+ * Otherwise, if a node already exists in enabled status then the
controller will just
+ * flip the status to disabled and clear the finalized features.
+ * - After the IBP config upgrade (i.e. IBP config set to greater than
or equal to
+ * KAFKA_2_7_IV0), when the controller starts up it will check if the
FeatureZNode exists
+ * and whether it is disabled. In such a case, it won’t upgrade all
features immediately.
+ * Instead it will just switch the FeatureZNode status to enabled
status. This lets the
+ * user finalize the features later.
+ *
+ * 3. Broker binary upgraded, with existing cluster IBP config >=
KAFKA_2_7_IV0:
+ * Imagine an existing Kafka cluster with IBP config >= KAFKA_2_7_IV0,
and the broker binary
+ * has just been upgraded to a newer version (that supports IBP config
KAFKA_2_7_IV0 and higher).
+ * The controller will start up and find that a FeatureZNode is already
present with enabled
+ * status and existing finalized features. In such a case, the controller
needs to scan the
+ * existing finalized features and mutate them for the purpose of version
level deprecation
+ * (if needed).
+ * This is how we handle this case: If an existing finalized feature is
present in the default
+ * finalized features, then, it's existing minimum version level is
updated to the default
+ * minimum version level maintained in the BrokerFeatures object. The
goal of this mutation is
+ * to permanently deprecate one or more feature version levels. The range
of feature version
+ * levels deprecated are from the closed range:
[existing_min_version_level, default_min_version_level].
+ * NOTE: Deprecating a feature version level is an incompatible change,
which requires a major
+ * release of Kafka. In such a release, the minimum version level
maintained within the
+ * BrokerFeatures class is updated suitably to record the deprecation of
the feature.
+ *
+ * 4. Broker downgrade:
+ * Imagine that a Kafka cluster exists already and the IBP config is
greater than or equal to
+ * KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by
setting IBP config to a
+ * value less than KAFKA_2_7_IV0. This means the user is also disabling
the feature versioning
+ * system (KIP-584). In this case, when the controller starts up with the
lower IBP config, it
+ * will switch the FeatureZNode status to disabled with empty features.
+ */
+ private def enableFeatureVersioning(): Unit = {
+ val defaultFinalizedFeatures = brokerFeatures.getDefaultFinalizedFeatures
+ val (mayBeFeatureZNodeBytes, version) =
zkClient.getDataAndVersion(FeatureZNode.path)
+ if (version == ZkVersion.UnknownVersion) {
+ val newVersion = createFeatureZNode(new
FeatureZNode(FeatureZNodeStatus.Enabled, defaultFinalizedFeatures))
+ featureCache.waitUntilEpochOrThrow(newVersion,
config.zkConnectionTimeoutMs)
+ } else {
+ val existingFeatureZNode =
FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+ var newFeatures: Features[FinalizedVersionRange] =
Features.emptyFinalizedFeatures()
+ if (existingFeatureZNode.status.equals(FeatureZNodeStatus.Enabled)) {
+ newFeatures =
Features.finalizedFeatures(existingFeatureZNode.features.features().asScala.map
{
+ case (featureName, existingVersionRange) => {
+ val brokerDefaultVersionRange =
defaultFinalizedFeatures.get(featureName)
+ if (brokerDefaultVersionRange == null) {
+ warn(s"Existing finalized feature: $featureName with
$existingVersionRange"
+ + s" is absent in default finalized $defaultFinalizedFeatures")
+ (featureName, existingVersionRange)
+ } else if (existingVersionRange.max() >=
brokerDefaultVersionRange.min() &&
+ brokerDefaultVersionRange.max() >=
existingVersionRange.max()) {
+ // Through this change, we deprecate all version levels in the
closed range:
+ // [existingVersionRange.min(), brokerDefaultVersionRange.min()
- 1]
+ (featureName, new
FinalizedVersionRange(brokerDefaultVersionRange.min(),
existingVersionRange.max()))
+ } else {
+ // If the existing version levels fall completely outside the
+ // range of the default finalized version levels (i.e. no
intersection), or, if the
+ // existing version levels are ineligible for a modification
since they are
+ // incompatible with default finalized version levels, then we
skip the update.
+ warn(s"Can not update minimum version level in finalized
feature: $featureName,"
+ + s" since the existing $existingVersionRange is not eligible
for a change"
+ + s" based on the default $brokerDefaultVersionRange.")
+ (featureName, existingVersionRange)
+ }
+ }
+ }.asJava)
+ }
+ val newFeatureZNode = new FeatureZNode(FeatureZNodeStatus.Enabled,
newFeatures)
+ if (!newFeatureZNode.equals(existingFeatureZNode)) {
Review comment:
I see, still wondering if we could just check whether `newFeatures` is
equal to `existingFeatureZNode.features`
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1844,185 @@ class KafkaController(val config: KafkaConfig,
}
}
+ /**
+ * Returns the new FinalizedVersionRange for the feature, if there are no
feature
+ * incompatibilities seen with all known brokers for the provided feature
update.
+ * Otherwise returns a suitable error.
+ *
+ * @param update the feature update to be processed (this can not be meant
to delete the feature)
+ *
+ * @return the new FinalizedVersionRange or error, as described
above.
+ */
+ private def newFinalizedVersionRangeOrIncompatibilityError(update:
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange,
ApiError] = {
+ if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+ throw new IllegalArgumentException(s"Provided feature update can not be
meant to delete the feature: $update")
+ }
Review comment:
nit: new line
##########
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "apiKey": 50,
+ "type": "request",
+ "name": "UpdateFeaturesRequest",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
+ "fields": [
+ { "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+",
+ "about": "The list of updates to finalized features.", "fields": [
+ {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
+ "about": "The name of the finalized feature to be updated."},
+ {"name": "MaxVersionLevel", "type": "int16", "versions": "0+",
+ "about": "The new maximum version level for the finalized feature. A
value >= 1 is valid. A value < 1, is special, and can be used to request the
deletion of the finalized feature."},
+ {"name": "AllowDowngrade", "type": "bool", "versions": "0+",
+ "about": "When set to true, the finalized feature version level is
allowed to be downgraded/deleted."}
Review comment:
Should we also mention that this flag would fail the request when we are
not actually doing a downgrade?
##########
File path:
clients/src/main/java/org/apache/kafka/common/errors/FeatureUpdateFailedException.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class FeatureUpdateFailedException extends ApiException {
Review comment:
Do we need to make this a public error? It seems only be used
internally, so could be made private if we don't have intention to let user
catch.
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -266,6 +275,178 @@ class KafkaController(val config: KafkaConfig,
}
}
+ private def createFeatureZNode(newNode: FeatureZNode): Int = {
+ info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents:
$newNode")
+ zkClient.createFeatureZNode(newNode)
+ val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+ newVersion
+ }
+
+ private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+ info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents:
$updatedNode")
+ zkClient.updateFeatureZNode(updatedNode)
+ }
+
+ /**
+ * This method enables the feature versioning system (KIP-584).
+ *
+ * Development in Kafka (from a high level) is organized into features. Each
feature is tracked by
+ * a name and a range of version numbers. A feature can be of two types:
+ *
+ * 1. Supported feature:
+ * A supported feature is represented by a name (String) and a range of
versions (defined by a
+ * {@link SupportedVersionRange}). It refers to a feature that a particular
broker advertises
+ * support for. Each broker advertises the version ranges of its own
supported features in its
+ * own BrokerIdZNode. The contents of the advertisement are specific to the
particular broker and
+ * do not represent any guarantee of a cluster-wide availability of the
feature for any particular
+ * range of versions.
+ *
+ * 2. Finalized feature:
+ * A finalized feature is represented by a name (String) and a range of
version levels (defined
+ * by a {@link FinalizedVersionRange}). Whenever the feature versioning
system (KIP-584) is
+ * enabled, the finalized features are stored in the cluster-wide common
FeatureZNode.
+ * In comparison to a supported feature, the key difference is that a
finalized feature exists
+ * in ZK only when it is guaranteed to be supported by any random broker in
the cluster for a
+ * specified range of version levels. Also, the controller is the only
entity modifying the
+ * information about finalized features.
+ *
+ * This method sets up the FeatureZNode with enabled status, which means
that the finalized
+ * features stored in the FeatureZNode are active. The enabled status should
be written by the
+ * controller to the FeatureZNode only when the broker IBP config is greater
than or equal to
+ * KAFKA_2_7_IV0.
+ *
+ * There are multiple cases handled here:
+ *
+ * 1. New cluster bootstrap:
+ * A new Kafka cluster (i.e. it is deployed first time) is almost always
started with IBP config
+ * setting greater than or equal to KAFKA_2_7_IV0. We would like to start
the cluster with all
+ * the possible supported features finalized immediately. Assuming this
is the case, the
+ * controller will start up and notice that the FeatureZNode is absent in
the new cluster,
+ * it will then create a FeatureZNode (with enabled status) containing
the entire list of
+ * default supported features as its finalized features.
+ *
+ * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+ * Imagine there is an existing Kafka cluster with IBP config less than
KAFKA_2_7_IV0, and the
+ * broker binary has been upgraded to a newer version that supports the
feature versioning
+ * system (KIP-584). This means the user is upgrading from an earlier
version of the broker
+ * binary. In this case, we want to start with no finalized features and
allow the user to
+ * finalize them whenever they are ready i.e. in the future whenever the
user sets IBP config
+ * to be greater than or equal to KAFKA_2_7_IV0, then the user could
start finalizing the
+ * features. This process ensures we do not enable all the possible
features immediately after
+ * an upgrade, which could be harmful to Kafka.
+ * This is how we handle such a case:
+ * - Before the IBP config upgrade (i.e. IBP config set to less than
KAFKA_2_7_IV0), the
+ * controller will start up and check if the FeatureZNode is absent.
If absent, it will
+ * react by creating a FeatureZNode with disabled status and empty
finalized features.
+ * Otherwise, if a node already exists in enabled status then the
controller will just
+ * flip the status to disabled and clear the finalized features.
+ * - After the IBP config upgrade (i.e. IBP config set to greater than
or equal to
+ * KAFKA_2_7_IV0), when the controller starts up it will check if the
FeatureZNode exists
+ * and whether it is disabled. In such a case, it won’t upgrade all
features immediately.
+ * Instead it will just switch the FeatureZNode status to enabled
status. This lets the
+ * user finalize the features later.
+ *
+ * 3. Broker binary upgraded, with existing cluster IBP config >=
KAFKA_2_7_IV0:
+ * Imagine an existing Kafka cluster with IBP config >= KAFKA_2_7_IV0,
and the broker binary
+ * has just been upgraded to a newer version (that supports IBP config
KAFKA_2_7_IV0 and higher).
+ * The controller will start up and find that a FeatureZNode is already
present with enabled
+ * status and existing finalized features. In such a case, the controller
needs to scan the
+ * existing finalized features and mutate them for the purpose of version
level deprecation
+ * (if needed).
+ * This is how we handle this case: If an existing finalized feature is
present in the default
+ * finalized features, then, its existing minimum version level is
updated to the default
+ * minimum version level maintained in the BrokerFeatures object. The
goal of this mutation is
+ * to permanently deprecate one or more feature version levels. The range
of feature version
+ * levels deprecated are from the closed range:
[existing_min_version_level, default_min_version_level].
+ * NOTE: Deprecating a feature version level is an incompatible change,
which requires a major
+ * release of Kafka. In such a release, the minimum version level
maintained within the
+ * BrokerFeatures class is updated suitably to record the deprecation of
the feature.
+ *
+ * 4. Broker downgrade:
+ * Imagine that a Kafka cluster exists already and the IBP config is
greater than or equal to
+ * KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by
setting IBP config to a
+ * value less than KAFKA_2_7_IV0. This means the user is also disabling
the feature versioning
+ * system (KIP-584). In this case, when the controller starts up with the
lower IBP config, it
+ * will switch the FeatureZNode status to disabled with empty features.
+ */
+ private def enableFeatureVersioning(): Unit = {
+ val defaultFinalizedFeatures = brokerFeatures.getDefaultFinalizedFeatures
+ val (mayBeFeatureZNodeBytes, version) =
zkClient.getDataAndVersion(FeatureZNode.path)
+ if (version == ZkVersion.UnknownVersion) {
+ val newVersion = createFeatureZNode(new
FeatureZNode(FeatureZNodeStatus.Enabled, defaultFinalizedFeatures))
+ featureCache.waitUntilEpochOrThrow(newVersion,
config.zkConnectionTimeoutMs)
+ } else {
+ val existingFeatureZNode =
FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+ var newFeatures: Features[FinalizedVersionRange] =
Features.emptyFinalizedFeatures()
+ if (existingFeatureZNode.status.equals(FeatureZNodeStatus.Enabled)) {
+ newFeatures =
Features.finalizedFeatures(existingFeatureZNode.features.features().asScala.map
{
+ case (featureName, existingVersionRange) =>
+ val brokerDefaultVersionRange =
defaultFinalizedFeatures.get(featureName)
+ if (brokerDefaultVersionRange == null) {
+ warn(s"Existing finalized feature: $featureName with
$existingVersionRange"
+ + s" is absent in default finalized $defaultFinalizedFeatures")
+ (featureName, existingVersionRange)
+ } else if (brokerDefaultVersionRange.max() >=
existingVersionRange.max() &&
+ brokerDefaultVersionRange.min() <=
existingVersionRange.max()) {
+ // Through this change, we deprecate all version levels in the
closed range:
+ // [existingVersionRange.min(), brokerDefaultVersionRange.min()
- 1]
+ (featureName, new
FinalizedVersionRange(brokerDefaultVersionRange.min(),
existingVersionRange.max()))
+ } else {
+ // If the existing version levels fall completely outside the
+ // range of the default finalized version levels (i.e. no
intersection), or, if the
+ // existing version levels are ineligible for a modification
since they are
+ // incompatible with default finalized version levels, then we
skip the update.
+ warn(s"Can not update minimum version level in finalized
feature: $featureName,"
+ + s" since the existing $existingVersionRange is not eligible
for a change"
+ + s" based on the default $brokerDefaultVersionRange.")
+ (featureName, existingVersionRange)
+ }
+ }.asJava)
+ }
+ val newFeatureZNode = new FeatureZNode(FeatureZNodeStatus.Enabled,
newFeatures)
+ if (!newFeatureZNode.equals(existingFeatureZNode)) {
+ val newVersion = updateFeatureZNode(newFeatureZNode)
+ featureCache.waitUntilEpochOrThrow(newVersion,
config.zkConnectionTimeoutMs)
+ }
+ }
+ }
+
+ /**
+ * Disables the feature versioning system (KIP-584).
+ *
+ * Sets up the FeatureZNode with disabled status. This status means the
feature versioning system
+ * (KIP-584) is disabled, and, the finalized features stored in the
FeatureZNode are not relevant.
+ * This status should be written by the controller to the FeatureZNode only
when the broker
+ * IBP config is less than KAFKA_2_7_IV0.
+ *
+ * NOTE:
+ * 1. When this method returns, existing finalized features (if any) will be
cleared from the
+ * FeatureZNode.
+ * 2. This method, unlike enableFeatureVersioning() need not wait for the
FinalizedFeatureCache
+ * to be updated, because, such updates to the cache (via
FinalizedFeatureChangeListener)
+ * are disabled when IBP config is < than KAFKA_2_7_IV0.
+ */
+ private def disableFeatureVersioning(): Unit = {
+ val newNode = FeatureZNode(FeatureZNodeStatus.Disabled,
Features.emptyFinalizedFeatures())
+ val (mayBeFeatureZNodeBytes, version) =
zkClient.getDataAndVersion(FeatureZNode.path)
+ if (version == ZkVersion.UnknownVersion) {
+ createFeatureZNode(newNode)
Review comment:
Do we need to call `featureCache.waitUntilEpochOrThrow(newNode,
config.zkConnectionTimeoutMs)` here to ensure the update is successful?
##########
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##########
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange,
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the following:
+ *
+ * 1. The latest features supported by the Broker.
+ *
+ * 2. The default minimum version levels for specific features. This map
enables feature
+ * version level deprecation. This is how it works: in order to deprecate
feature version levels,
+ * in this map the default minimum version level of a feature can be set to
a new value that's
+ * higher than 1 (let's call this latest_min_version_level). In doing so,
the feature version levels
+ * in the closed range: [1, latest_min_version_level - 1] get deprecated by
the controller logic
+ * that applies this map to persistent finalized feature state in ZK (this
mutation happens
+ * during controller election and during finalized feature updates via the
+ * ApiKeys.UPDATE_FINALIZED_FEATURES api). This will automatically mean
external clients of Kafka
+ * would need to stop using the finalized min version levels that have been
deprecated.
+ *
+ * This class also provides APIs to check for incompatibilities between the
features supported by
+ * the Broker and finalized features. This class is immutable in production.
It provides few APIs to
+ * mutate state only for the purpose of testing.
+ */
+class BrokerFeatures private (@volatile var supportedFeatures:
Features[SupportedVersionRange],
+ @volatile var defaultFeatureMinVersionLevels:
Map[String, Short]) {
+ require(BrokerFeatures.areFeatureMinVersionLevelsCompatible(
+ supportedFeatures, defaultFeatureMinVersionLevels))
+
+ // For testing only.
+ def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit
= {
+ require(
+ BrokerFeatures.areFeatureMinVersionLevelsCompatible(newFeatures,
defaultFeatureMinVersionLevels))
+ supportedFeatures = newFeatures
+ }
+
+ /**
+ * Returns the default minimum version level for a specific feature.
+ *
+ * @param feature the name of the feature
+ *
+ * @return the default minimum version level for the feature if its
defined.
+ * otherwise, returns 1.
+ */
+ def defaultMinVersionLevel(feature: String): Short = {
+ defaultFeatureMinVersionLevels.getOrElse(feature, 1)
+ }
+
+ // For testing only.
+ def setDefaultMinVersionLevels(newMinVersionLevels: Map[String, Short]):
Unit = {
+ require(
+ BrokerFeatures.areFeatureMinVersionLevelsCompatible(supportedFeatures,
newMinVersionLevels))
+ defaultFeatureMinVersionLevels = newMinVersionLevels
+ }
+
+ /**
+ * Returns the default finalized features that a new Kafka cluster with IBP
config >= KAFKA_2_7_IV0
+ * needs to be bootstrapped with.
+ */
+ def getDefaultFinalizedFeatures: Features[FinalizedVersionRange] = {
+ Features.finalizedFeatures(
+ supportedFeatures.features.asScala.map {
+ case(name, versionRange) => (
+ name, new FinalizedVersionRange(defaultMinVersionLevel(name),
versionRange.max))
+ }.asJava)
+ }
+
+ /**
+ * Returns the set of feature names found to be incompatible.
+ * A feature incompatibility is a version mismatch between the latest
feature supported by the
+ * Broker, and the provided finalized feature. This can happen because a
provided finalized
+ * feature:
+ * 1) Does not exist in the Broker (i.e. it is unknown to the Broker).
+ * [OR]
+ * 2) Exists but the FinalizedVersionRange does not match with the
+ * supported feature's SupportedVersionRange.
+ *
+ * @param finalized The finalized features against which incompatibilities
need to be checked for.
+ *
+ * @return The subset of input features which are incompatible.
If the returned object
+ * is empty, it means there were no feature
incompatibilities found.
+ */
+ def incompatibleFeatures(finalized: Features[FinalizedVersionRange]):
Features[FinalizedVersionRange] = {
+ BrokerFeatures.incompatibleFeatures(supportedFeatures, finalized,
logIncompatibilities = true)
+ }
+}
+
+object BrokerFeatures extends Logging {
+
+ def createDefault(): BrokerFeatures = {
+ // The arguments are currently empty, but, in the future as we define
features we should
+ // populate the required values here.
+ new BrokerFeatures(emptySupportedFeatures, Map[String, Short]())
+ }
+
+ /**
+ * Returns true if any of the provided finalized features are incompatible
with the provided
+ * supported features.
+ *
+ * @param supportedFeatures The supported features to be compared
+ * @param finalizedFeatures The finalized features to be compared
+ *
+ * @return - True if there are any feature
incompatibilities found.
+ * - False otherwise.
+ */
+ def hasIncompatibleFeatures(supportedFeatures:
Features[SupportedVersionRange],
+ finalizedFeatures:
Features[FinalizedVersionRange]): Boolean = {
+ !incompatibleFeatures(supportedFeatures, finalizedFeatures, false).empty
+ }
+
+ private def incompatibleFeatures(supportedFeatures:
Features[SupportedVersionRange],
+ finalizedFeatures:
Features[FinalizedVersionRange],
+ logIncompatibilities: Boolean):
Features[FinalizedVersionRange] = {
+ val incompatibleFeaturesInfo = finalizedFeatures.features.asScala.map {
+ case (feature, versionLevels) =>
+ val supportedVersions = supportedFeatures.get(feature)
+ if (supportedVersions == null) {
+ (feature, versionLevels, "{feature=%s, reason='Unsupported
feature'}".format(feature))
+ } else if (versionLevels.isIncompatibleWith(supportedVersions)) {
+ (feature, versionLevels, "{feature=%s, reason='%s is incompatible
with %s'}".format(
+ feature, versionLevels, supportedVersions))
+ } else {
+ (feature, versionLevels, null)
+ }
+ }.filter{ case(_, _, errorReason) => errorReason != null}.toList
+
+ if (logIncompatibilities && incompatibleFeaturesInfo.nonEmpty) {
+ warn(
+ "Feature incompatibilities seen: " + incompatibleFeaturesInfo.map {
+ case(_, _, errorReason) => errorReason })
+ }
+ Features.finalizedFeatures(incompatibleFeaturesInfo.map {
+ case(feature, versionLevels, _) => (feature, versionLevels)
}.toMap.asJava)
+ }
+
+ /**
+ * A check that ensures each feature defined with min version level is a
supported feature, and
+ * the min version level value is valid (i.e. it is compatible with the
supported version range).
+ *
+ * @param supportedFeatures the supported features
+ * @param featureMinVersionLevels the feature minimum version levels
+ *
+ * @return - true, if the above described check
passes.
+ * - false, otherwise.
+ */
+ private def areFeatureMinVersionLevelsCompatible(
+ supportedFeatures: Features[SupportedVersionRange],
+ featureMinVersionLevels: Map[String, Short]
+ ): Boolean = {
+ featureMinVersionLevels.forall {
+ case(featureName, minVersionLevel) =>
+ val supportedFeature = supportedFeatures.get(featureName)
+ (supportedFeature != null) &&
+ !new FinalizedVersionRange(minVersionLevel, supportedFeature.max())
Review comment:
Could we get a static method instead of initiating a new
`FinalizedVersionRange` for a comparison every time?
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1844,188 @@ class KafkaController(val config: KafkaConfig,
}
}
+ /**
+ * Returns the new FinalizedVersionRange for the feature, if there are no
feature
+ * incompatibilities seen with all known brokers for the provided feature
update.
+ * Otherwise returns a suitable error.
Review comment:
State the error explicitly here.
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1844,188 @@ class KafkaController(val config: KafkaConfig,
}
}
+ /**
+ * Returns the new FinalizedVersionRange for the feature, if there are no
feature
+ * incompatibilities seen with all known brokers for the provided feature
update.
+ * Otherwise returns a suitable error.
+ *
+ * @param update the feature update to be processed (this can not be meant
to delete the feature)
+ *
+ * @return the new FinalizedVersionRange or error, as described
above.
+ */
+ private def newFinalizedVersionRangeOrIncompatibilityError(update:
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange,
ApiError] = {
+ if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+ throw new IllegalArgumentException(s"Provided feature update can not be
meant to delete the feature: $update")
+ }
+ // NOTE: Below we set the finalized min version level to be the default
minimum version
+ // level. If the finalized feature already exists, then, this can cause
deprecation of all
+ // version levels in the closed range:
+ // [existingVersionRange.min(), defaultMinVersionLevel - 1].
+ val defaultMinVersionLevel =
brokerFeatures.defaultMinVersionLevel(update.feature)
+ val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel,
update.maxVersionLevel)
+ val numIncompatibleBrokers =
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+ val singleFinalizedFeature =
+ Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature,
newVersionRange)))
+ BrokerFeatures.hasIncompatibleFeatures(broker.features,
singleFinalizedFeature)
+ })
+ if (numIncompatibleBrokers == 0) {
+ Left(newVersionRange)
+ } else {
+ Right(
+ new ApiError(Errors.INVALID_REQUEST,
+ s"Could not apply finalized feature update because
$numIncompatibleBrokers" +
+ " brokers were found to have incompatible features."))
+ }
+ }
+
+ /**
+ * Validate and process a finalized feature update.
+ *
+ * If the processing is successful, then, the return value contains:
+ * 1. the new FinalizedVersionRange for the feature, if the feature update
was not meant to delete the feature.
+ * 2. Option.empty, if the feature update was meant to delete the feature.
+ *
+ * If the processing failed, then returned value contains a suitable
ApiError.
+ *
+ * @param update the feature update to be processed.
+ *
+ * @return the new FinalizedVersionRange or error, as described
above.
+ */
+ private def processFeatureUpdate(update:
UpdateFeaturesRequestData.FeatureUpdateKey):
Either[Option[FinalizedVersionRange], ApiError] = {
+ val existingFeatures = featureCache.get
+ .map(featuresAndEpoch => featuresAndEpoch.features.features().asScala)
+ .getOrElse(Map[String, FinalizedVersionRange]())
+
+ def newVersionRangeOrError(update:
UpdateFeaturesRequestData.FeatureUpdateKey):
Either[Option[FinalizedVersionRange], ApiError] = {
+ newFinalizedVersionRangeOrIncompatibilityError(update)
+ .fold(versionRange => Left(Some(versionRange)), error => Right(error))
+ }
+
+ if (update.feature.isEmpty) {
+ // Check that the feature name is not empty.
+ Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be
empty."))
+ } else {
+ val cacheEntry = existingFeatures.get(update.feature).orNull
+
+ // We handle deletion requests separately from non-deletion requests.
+ if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+ if (cacheEntry == null) {
+ // Disallow deletion of a non-existing finalized feature.
+ Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not delete non-existing finalized feature:
'${update.feature}'"))
+ } else {
+ Left(Option.empty)
+ }
+ } else if (update.maxVersionLevel() < 1) {
+ // Disallow deletion of a finalized feature without allowDowngrade
flag set.
+ Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not provide maxVersionLevel:
${update.maxVersionLevel} less" +
+ s" than 1 for feature: '${update.feature}' without
setting the" +
+ " allowDowngrade flag to true in the request."))
+ } else {
+ if (cacheEntry == null) {
+ newVersionRangeOrError(update)
+ } else {
+ if (update.maxVersionLevel == cacheEntry.max()) {
+ // Disallow a case where target maxVersionLevel matches existing
maxVersionLevel.
+ Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not ${if (update.allowDowngrade)
"downgrade" else "upgrade"}" +
+ s" a finalized feature: '${update.feature}'
from existing" +
+ s" maxVersionLevel:${cacheEntry.max} to the
same value."))
+ } else if (update.maxVersionLevel < cacheEntry.max &&
!update.allowDowngrade) {
+ // Disallow downgrade of a finalized feature without the
allowDowngrade flag set.
+ Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not downgrade finalized feature:
'${update.feature}' from" +
+ s" existing maxVersionLevel:${cacheEntry.max}
to provided" +
+ s" maxVersionLevel:${update.maxVersionLevel}
without setting the" +
+ " allowDowngrade flag in the request."))
+ } else if (update.allowDowngrade && update.maxVersionLevel >
cacheEntry.max) {
Review comment:
I'm actually wondering whether this is too strict in the perspective of
a user. If they accidentally set a feature version larger than the cache, what
they only care about is to be able to change the version to it. So it's a
matter of whether we think this is a user error, or this could happen when user
gets stale feature information from a broker while the downgrade already
succeed eventually.
If we want to keep this check, it makes sense to update the meta comments
around `allowDowngrade` to inform user that the request could fail when the
target version is actually higher than the current finalized feature.
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1844,185 @@ class KafkaController(val config: KafkaConfig,
}
}
+ /**
+ * Returns the new FinalizedVersionRange for the feature, if there are no
feature
+ * incompatibilities seen with all known brokers for the provided feature
update.
+ * Otherwise returns a suitable error.
+ *
+ * @param update the feature update to be processed (this can not be meant
to delete the feature)
+ *
+ * @return the new FinalizedVersionRange or error, as described
above.
+ */
+ private def newFinalizedVersionRangeOrIncompatibilityError(update:
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange,
ApiError] = {
+ if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+ throw new IllegalArgumentException(s"Provided feature update can not be
meant to delete the feature: $update")
+ }
+ // NOTE: Below we set the finalized min version level to be the default
minimum version
+ // level. If the finalized feature already exists, then, this can cause
deprecation of all
+ // version levels in the closed range:
+ // [existingVersionRange.min(), defaultMinVersionLevel - 1].
+ val defaultMinVersionLevel =
brokerFeatures.defaultMinVersionLevel(update.feature)
+ val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel,
update.maxVersionLevel)
+ val numIncompatibleBrokers =
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+ val singleFinalizedFeature =
+ Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature,
newVersionRange)))
+ BrokerFeatures.hasIncompatibleFeatures(broker.features,
singleFinalizedFeature)
+ })
+ if (numIncompatibleBrokers == 0) {
+ Left(newVersionRange)
+ } else {
+ Right(
+ new ApiError(Errors.INVALID_REQUEST,
+ s"Could not apply finalized feature update because
$numIncompatibleBrokers" +
+ " brokers were found to have incompatible features."))
+ }
+ }
+
+ /**
+ * Validate and process a finalized feature update on an existing
FinalizedVersionRange for the
+ * feature.
+ *
+ * If the processing is successful, then, the return value contains:
+ * 1. the new FinalizedVersionRange for the feature, if the feature update
was not meant to delete the feature.
+ * 2. Option.empty, if the feature update was meant to delete the feature.
+ *
+ * If the processing failed, then returned value contains a suitable
ApiError.
+ *
+ * @param update the feature update to be processed.
+ * @param existingVersionRange the existing FinalizedVersionRange which
can be empty when no
+ * FinalizedVersionRange exists for the
associated feature
+ *
+ * @return the new FinalizedVersionRange or error, as
described above.
+ */
+ private def processFeatureUpdate(update:
UpdateFeaturesRequestData.FeatureUpdateKey,
+ existingVersionRange:
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError]
= {
+ def newVersionRangeOrError(update:
UpdateFeaturesRequestData.FeatureUpdateKey):
Either[Option[FinalizedVersionRange], ApiError] = {
+ newFinalizedVersionRangeOrIncompatibilityError(update)
+ .fold(versionRange => Left(Some(versionRange)), error => Right(error))
+ }
+
+ if (update.feature.isEmpty) {
+ // Check that the feature name is not empty.
+ Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be
empty."))
+ } else {
+ // We handle deletion requests separately from non-deletion requests.
+ if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+ if (existingVersionRange.isEmpty) {
+ // Disallow deletion of a non-existing finalized feature.
+ Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not delete non-existing finalized feature:
'${update.feature}'"))
+ } else {
+ Left(Option.empty)
+ }
+ } else if (update.maxVersionLevel() < 1) {
Review comment:
Is this case covered by the case on L1931? Could we merge both?
##########
File path: core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala
##########
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange,
SupportedVersionRange}
+import org.junit.Assert.{assertEquals, assertThrows, assertTrue}
+import org.junit.Test
+
+import scala.jdk.CollectionConverters._
+
+class BrokerFeaturesTest {
Review comment:
Some methods in the `BrokerFeatures` are not covered by this suite, such
as `defaultMinVersionLevel`, `getDefaultFinalizedFeatures` and
`hasIncompatibleFeatures`, you could use code coverage tool to figure out any
missing part.
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1395,7 +1596,7 @@ class KafkaController(val config: KafkaConfig,
if (newMetadataOpt.nonEmpty && oldMetadataOpt.nonEmpty) {
val oldMetadata = oldMetadataOpt.get
val newMetadata = newMetadataOpt.get
- if (newMetadata.endPoints != oldMetadata.endPoints) {
+ if (newMetadata.endPoints != oldMetadata.endPoints ||
!oldMetadata.features.equals(newMetadata.features)) {
Review comment:
I see, still I'm a bit worried future changes could break this
assumption. Not a bad idea to check `features != null`?
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1844,185 @@ class KafkaController(val config: KafkaConfig,
}
}
+ /**
+ * Returns the new FinalizedVersionRange for the feature, if there are no
feature
+ * incompatibilities seen with all known brokers for the provided feature
update.
+ * Otherwise returns a suitable error.
+ *
+ * @param update the feature update to be processed (this can not be meant
to delete the feature)
+ *
+ * @return the new FinalizedVersionRange or error, as described
above.
+ */
+ private def newFinalizedVersionRangeOrIncompatibilityError(update:
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange,
ApiError] = {
+ if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+ throw new IllegalArgumentException(s"Provided feature update can not be
meant to delete the feature: $update")
+ }
+ // NOTE: Below we set the finalized min version level to be the default
minimum version
+ // level. If the finalized feature already exists, then, this can cause
deprecation of all
+ // version levels in the closed range:
+ // [existingVersionRange.min(), defaultMinVersionLevel - 1].
+ val defaultMinVersionLevel =
brokerFeatures.defaultMinVersionLevel(update.feature)
+ val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel,
update.maxVersionLevel)
+ val numIncompatibleBrokers =
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+ val singleFinalizedFeature =
+ Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature,
newVersionRange)))
+ BrokerFeatures.hasIncompatibleFeatures(broker.features,
singleFinalizedFeature)
+ })
+ if (numIncompatibleBrokers == 0) {
+ Left(newVersionRange)
+ } else {
+ Right(
+ new ApiError(Errors.INVALID_REQUEST,
+ s"Could not apply finalized feature update because
$numIncompatibleBrokers" +
+ " brokers were found to have incompatible features."))
+ }
+ }
+
+ /**
+ * Validate and process a finalized feature update on an existing
FinalizedVersionRange for the
+ * feature.
+ *
+ * If the processing is successful, then, the return value contains:
+ * 1. the new FinalizedVersionRange for the feature, if the feature update
was not meant to delete the feature.
+ * 2. Option.empty, if the feature update was meant to delete the feature.
+ *
+ * If the processing failed, then returned value contains a suitable
ApiError.
+ *
+ * @param update the feature update to be processed.
+ * @param existingVersionRange the existing FinalizedVersionRange which
can be empty when no
+ * FinalizedVersionRange exists for the
associated feature
+ *
+ * @return the new FinalizedVersionRange or error, as
described above.
+ */
+ private def processFeatureUpdate(update:
UpdateFeaturesRequestData.FeatureUpdateKey,
+ existingVersionRange:
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError]
= {
+ def newVersionRangeOrError(update:
UpdateFeaturesRequestData.FeatureUpdateKey):
Either[Option[FinalizedVersionRange], ApiError] = {
+ newFinalizedVersionRangeOrIncompatibilityError(update)
+ .fold(versionRange => Left(Some(versionRange)), error => Right(error))
+ }
+
+ if (update.feature.isEmpty) {
+ // Check that the feature name is not empty.
+ Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be
empty."))
+ } else {
+ // We handle deletion requests separately from non-deletion requests.
+ if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+ if (existingVersionRange.isEmpty) {
+ // Disallow deletion of a non-existing finalized feature.
+ Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not delete non-existing finalized feature:
'${update.feature}'"))
+ } else {
+ Left(Option.empty)
+ }
+ } else if (update.maxVersionLevel() < 1) {
+ // Disallow deletion of a finalized feature without allowDowngrade
flag set.
+ Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not provide maxVersionLevel:
${update.maxVersionLevel} less" +
+ s" than 1 for feature: '${update.feature}' without
setting the" +
+ " allowDowngrade flag to true in the request."))
+ } else {
+ existingVersionRange.map(existing =>
+ if (update.maxVersionLevel == existing.max) {
+ // Disallow a case where target maxVersionLevel matches existing
maxVersionLevel.
+ Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not ${if (update.allowDowngrade) "downgrade" else
"upgrade"}" +
+ s" a finalized feature: '${update.feature}' from existing" +
+ s" maxVersionLevel:${existing.max} to the same value."))
+ } else if (update.maxVersionLevel < existing.max &&
!update.allowDowngrade) {
+ // Disallow downgrade of a finalized feature without the
allowDowngrade flag set.
+ Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not downgrade finalized feature: '${update.feature}' from"
+
+ s" existing maxVersionLevel:${existing.max} to provided" +
+ s" maxVersionLevel:${update.maxVersionLevel} without setting
the" +
+ " allowDowngrade flag in the request."))
+ } else if (update.allowDowngrade && update.maxVersionLevel >
existing.max) {
+ // Disallow a request that sets allowDowngrade flag without
specifying a
+ // maxVersionLevel that's lower than the existing maxVersionLevel.
+ Right(new ApiError(Errors.INVALID_REQUEST,
+ s"When finalized feature: '${update.feature}' has the
allowDowngrade" +
+ " flag set in the request, the provided" +
+ s" maxVersionLevel:${update.maxVersionLevel} can not be
greater than" +
+ s" existing maxVersionLevel:${existing.max}."))
+ } else if (update.maxVersionLevel() < existing.min) {
+ // Disallow downgrade of a finalized feature below the existing
finalized
+ // minVersionLevel.
+ Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not downgrade finalized feature: '${update.feature}' to" +
+ s" maxVersionLevel:${update.maxVersionLevel} because it's
lower than" +
+ s" the existing minVersionLevel:${existing.min}."))
+ } else {
+ newVersionRangeOrError(update)
+ }
+ ).getOrElse(newVersionRangeOrError(update))
+ }
+ }
+ }
+
+ private def processFeatureUpdates(request: UpdateFeaturesRequest,
+ callback: UpdateFeaturesCallback): Unit = {
+ if (isActive) {
+ processFeatureUpdatesWithActiveController(request, callback)
+ } else {
+ val results = request.data().featureUpdates().asScala.map {
+ update => update.feature() -> new ApiError(Errors.NOT_CONTROLLER)
+ }.toMap
+ callback(results)
+ }
+ }
+
+ private def processFeatureUpdatesWithActiveController(request:
UpdateFeaturesRequest,
+ callback:
UpdateFeaturesCallback): Unit = {
+ val updates = request.data.featureUpdates
+ val existingFeatures = featureCache.get
+ .map(featuresAndEpoch => featuresAndEpoch.features.features().asScala)
+ .getOrElse(Map[String, FinalizedVersionRange]())
+ // Map of feature to FinalizedVersionRange. This contains the target
features to be eventually
+ // written to FeatureZNode.
+ val targetFeatures = scala.collection.mutable.Map[String,
FinalizedVersionRange]() ++ existingFeatures
+ // Map of feature to error.
+ var errors = scala.collection.mutable.Map[String, ApiError]()
+
+ // Process each FeatureUpdate.
+ // If a FeatureUpdate is found to be valid, then the corresponding entry
in errors would contain
+ // Errors.NONE. Otherwise the entry would contain the appropriate error.
+ updates.asScala.iterator.foreach { update =>
+ processFeatureUpdate(update, existingFeatures.get(update.feature()))
match {
+ case Left(newVersionRangeOrNone) =>
+ newVersionRangeOrNone
+ .map(newVersionRange => targetFeatures += (update.feature() ->
newVersionRange))
+ .getOrElse(targetFeatures -= update.feature())
+ errors += (update.feature() -> new ApiError(Errors.NONE))
+ case Right(featureUpdateFailureReason) =>
+ errors += (update.feature() -> featureUpdateFailureReason)
+ }
+ }
+
+ if (existingFeatures.equals(targetFeatures)) {
Review comment:
Could you clarify the reasoning here? If structs are not the same, are
we going to do a partial update?
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1844,185 @@ class KafkaController(val config: KafkaConfig,
}
}
+ /**
+ * Returns the new FinalizedVersionRange for the feature, if there are no
feature
+ * incompatibilities seen with all known brokers for the provided feature
update.
+ * Otherwise returns a suitable error.
+ *
+ * @param update the feature update to be processed (this can not be meant
to delete the feature)
+ *
+ * @return the new FinalizedVersionRange or error, as described
above.
+ */
+ private def newFinalizedVersionRangeOrIncompatibilityError(update:
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange,
ApiError] = {
+ if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+ throw new IllegalArgumentException(s"Provided feature update can not be
meant to delete the feature: $update")
+ }
+ // NOTE: Below we set the finalized min version level to be the default
minimum version
+ // level. If the finalized feature already exists, then, this can cause
deprecation of all
+ // version levels in the closed range:
+ // [existingVersionRange.min(), defaultMinVersionLevel - 1].
+ val defaultMinVersionLevel =
brokerFeatures.defaultMinVersionLevel(update.feature)
+ val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel,
update.maxVersionLevel)
+ val numIncompatibleBrokers =
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+ val singleFinalizedFeature =
+ Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature,
newVersionRange)))
+ BrokerFeatures.hasIncompatibleFeatures(broker.features,
singleFinalizedFeature)
+ })
+ if (numIncompatibleBrokers == 0) {
+ Left(newVersionRange)
+ } else {
+ Right(
+ new ApiError(Errors.INVALID_REQUEST,
+ s"Could not apply finalized feature update because
$numIncompatibleBrokers" +
+ " brokers were found to have incompatible features."))
+ }
+ }
+
+ /**
+ * Validate and process a finalized feature update on an existing
FinalizedVersionRange for the
+ * feature.
+ *
+ * If the processing is successful, then, the return value contains:
+ * 1. the new FinalizedVersionRange for the feature, if the feature update
was not meant to delete the feature.
+ * 2. Option.empty, if the feature update was meant to delete the feature.
+ *
+ * If the processing failed, then returned value contains a suitable
ApiError.
+ *
+ * @param update the feature update to be processed.
+ * @param existingVersionRange the existing FinalizedVersionRange which
can be empty when no
+ * FinalizedVersionRange exists for the
associated feature
+ *
+ * @return the new FinalizedVersionRange or error, as
described above.
+ */
+ private def processFeatureUpdate(update:
UpdateFeaturesRequestData.FeatureUpdateKey,
+ existingVersionRange:
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError]
= {
+ def newVersionRangeOrError(update:
UpdateFeaturesRequestData.FeatureUpdateKey):
Either[Option[FinalizedVersionRange], ApiError] = {
+ newFinalizedVersionRangeOrIncompatibilityError(update)
+ .fold(versionRange => Left(Some(versionRange)), error => Right(error))
+ }
+
+ if (update.feature.isEmpty) {
+ // Check that the feature name is not empty.
+ Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be
empty."))
+ } else {
+ // We handle deletion requests separately from non-deletion requests.
+ if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+ if (existingVersionRange.isEmpty) {
+ // Disallow deletion of a non-existing finalized feature.
+ Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not delete non-existing finalized feature:
'${update.feature}'"))
+ } else {
+ Left(Option.empty)
+ }
+ } else if (update.maxVersionLevel() < 1) {
+ // Disallow deletion of a finalized feature without allowDowngrade
flag set.
+ Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not provide maxVersionLevel:
${update.maxVersionLevel} less" +
+ s" than 1 for feature: '${update.feature}' without
setting the" +
+ " allowDowngrade flag to true in the request."))
+ } else {
+ existingVersionRange.map(existing =>
+ if (update.maxVersionLevel == existing.max) {
+ // Disallow a case where target maxVersionLevel matches existing
maxVersionLevel.
+ Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not ${if (update.allowDowngrade) "downgrade" else
"upgrade"}" +
+ s" a finalized feature: '${update.feature}' from existing" +
+ s" maxVersionLevel:${existing.max} to the same value."))
+ } else if (update.maxVersionLevel < existing.max &&
!update.allowDowngrade) {
+ // Disallow downgrade of a finalized feature without the
allowDowngrade flag set.
+ Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not downgrade finalized feature: '${update.feature}' from"
+
+ s" existing maxVersionLevel:${existing.max} to provided" +
+ s" maxVersionLevel:${update.maxVersionLevel} without setting
the" +
+ " allowDowngrade flag in the request."))
+ } else if (update.allowDowngrade && update.maxVersionLevel >
existing.max) {
+ // Disallow a request that sets allowDowngrade flag without
specifying a
+ // maxVersionLevel that's lower than the existing maxVersionLevel.
+ Right(new ApiError(Errors.INVALID_REQUEST,
+ s"When finalized feature: '${update.feature}' has the
allowDowngrade" +
+ " flag set in the request, the provided" +
+ s" maxVersionLevel:${update.maxVersionLevel} can not be
greater than" +
+ s" existing maxVersionLevel:${existing.max}."))
+ } else if (update.maxVersionLevel() < existing.min) {
Review comment:
We should be consistent and remove `()` from `maxVersionLevel`
##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -39,7 +42,7 @@ case class FinalizedFeaturesAndEpoch(features:
Features[FinalizedVersionRange],
*
* @see FinalizedFeatureChangeListener
*/
-object FinalizedFeatureCache extends Logging {
+class FinalizedFeatureCache(private val brokerFeatures: BrokerFeatures)
extends Logging {
Review comment:
The meta comment for `FinalizedFeatureCache` should be updated as it is
now being accessed for both read and write
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2945,6 +2948,130 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
+ def handleUpdateFeatures(request: RequestChannel.Request): Unit = {
Review comment:
Seems not covered yet
##########
File path:
core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
##########
@@ -78,25 +76,42 @@ class FinalizedFeatureChangeListenerTest extends
ZooKeeperTestHarness {
/**
* Tests that the listener can be initialized, and that it can listen to ZK
notifications
* successfully from an "Enabled" FeatureZNode (the ZK data has no feature
incompatibilities).
+ * Particularly the test checks if multiple notifications can be processed
in ZK
+ * (i.e. whether the FeatureZNode watch can be re-established).
*/
@Test
def testInitSuccessAndNotificationSuccess(): Unit = {
- createSupportedFeatures()
val initialFinalizedFeatures = createFinalizedFeatures()
- val listener = createListener(Some(initialFinalizedFeatures))
+ val brokerFeatures = createBrokerFeatures()
+ val cache = new FinalizedFeatureCache(brokerFeatures)
+ val listener = createListener(cache, Some(initialFinalizedFeatures))
- val updatedFinalizedFeaturesMap = Map[String, FinalizedVersionRange](
- "feature_1" -> new FinalizedVersionRange(2, 4))
- val updatedFinalizedFeatures =
Features.finalizedFeatures(updatedFinalizedFeaturesMap.asJava)
- zkClient.updateFeatureZNode(FeatureZNode(FeatureZNodeStatus.Enabled,
updatedFinalizedFeatures))
- val (mayBeFeatureZNodeNewBytes, updatedVersion) =
zkClient.getDataAndVersion(FeatureZNode.path)
- assertNotEquals(updatedVersion, ZkVersion.UnknownVersion)
- assertFalse(mayBeFeatureZNodeNewBytes.isEmpty)
- assertTrue(updatedVersion > initialFinalizedFeatures.epoch)
- TestUtils.waitUntilTrue(() => {
-
FinalizedFeatureCache.get.get.equals(FinalizedFeaturesAndEpoch(updatedFinalizedFeatures,
updatedVersion))
- }, "Timed out waiting for FinalizedFeatureCache to be updated with new
features")
- assertTrue(listener.isListenerInitiated)
+ def updateAndCheckCache(finalizedFeatures:
Features[FinalizedVersionRange]): Unit = {
+ zkClient.updateFeatureZNode(FeatureZNode(FeatureZNodeStatus.Enabled,
finalizedFeatures))
+ val (mayBeFeatureZNodeNewBytes, updatedVersion) =
zkClient.getDataAndVersion(FeatureZNode.path)
+ assertNotEquals(updatedVersion, ZkVersion.UnknownVersion)
+ assertFalse(mayBeFeatureZNodeNewBytes.isEmpty)
+ assertTrue(updatedVersion > initialFinalizedFeatures.epoch)
+
+ cache.waitUntilEpochOrThrow(updatedVersion,
JTestUtils.DEFAULT_MAX_WAIT_MS)
+ assertEquals(FinalizedFeaturesAndEpoch(finalizedFeatures,
updatedVersion), cache.get.get)
+ assertTrue(listener.isListenerInitiated)
+ }
+
+ // Check if the write succeeds and a ZK notification is received that
causes the feature cache
+ // to be populated.
+ updateAndCheckCache(
+ Features.finalizedFeatures(
+ Map[String, FinalizedVersionRange](
+ "feature_1" -> new FinalizedVersionRange(2, 4)).asJava))
Review comment:
Indentation is not right.
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -109,7 +109,9 @@ class KafkaApis(val requestChannel: RequestChannel,
brokerTopicStats: BrokerTopicStats,
val clusterId: String,
time: Time,
- val tokenManager: DelegationTokenManager) extends Logging {
+ val tokenManager: DelegationTokenManager,
+ val brokerFeatures: BrokerFeatures,
Review comment:
Could we only pass in `featureCache` to reduce the class coupling here?
As we already have `brokerFeatures` as a private parameter, it shouldn't be too
hard to set a helper to get supported features.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]