abbccdda commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r462453975
########## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ########## @@ -1214,6 +1215,71 @@ default AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlterati */ AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options); + /** + * Describes finalized as well as supported features. By default, the request is issued to any + * broker. It can be optionally directed only to the controller via DescribeFeaturesOptions + * parameter. This is particularly useful if the user requires strongly consistent reads of + * finalized features. + * <p> + * The following exceptions can be anticipated when calling {@code get()} on the future from the + * returned {@link DescribeFeaturesResult}: + * <ul> + * <li>{@link org.apache.kafka.common.errors.TimeoutException} + * If the request timed out before the describe operation could finish.</li> + * </ul> + * <p> + * @param options the options to use + * + * @return the {@link DescribeFeaturesResult} containing the result + */ + DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options); Review comment: Note in the post-KIP-500 world, this feature could still work, but the request must be redirected to the controller inherently on the broker side, instead of sending it directly. So in the comment, we may try to phrase it to convey the principal is that `the request must be handled by the controller` instead of `the admin client must send this request to the controller`. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ########## @@ -1214,6 +1215,70 @@ default AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlterati */ AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options); + /** + * Describes finalized as well as supported features. By default, the request is issued to any + * broker. It can be optionally directed only to the controller via DescribeFeaturesOptions + * parameter. This is particularly useful if the user requires strongly consistent reads of + * finalized features. + * <p> + * The following exceptions can be anticipated when calling {@code get()} on the future from the + * returned {@link DescribeFeaturesResult}: + * <ul> + * <li>{@link org.apache.kafka.common.errors.TimeoutException} + * If the request timed out before the describe operation could finish.</li> + * </ul> + * <p> + * @param options the options to use + * + * @return the {@link DescribeFeaturesResult} containing the result + */ + DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options); + + /** + * Applies specified updates to finalized features. This operation is not transactional so it + * may succeed for some features while fail for others. + * <p> + * The API takes in a map of finalized feature name to {@link FeatureUpdate} that need to be Review comment: nit: s/name/names ########## File path: core/src/main/scala/kafka/controller/KafkaController.scala ########## @@ -983,8 +1144,25 @@ class KafkaController(val config: KafkaConfig, */ private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition]): Unit = { try { + val filteredBrokers = scala.collection.mutable.Set[Int]() ++ brokers + if (config.isFeatureVersioningEnabled) { + def hasIncompatibleFeatures(broker: Broker): Boolean = { + val latestFinalizedFeatures = featureCache.get + if (latestFinalizedFeatures.isDefined) { + BrokerFeatures.hasIncompatibleFeatures(broker.features, latestFinalizedFeatures.get.features) + } else { + false + } + } + controllerContext.liveOrShuttingDownBrokers.foreach(broker => { + if (filteredBrokers.contains(broker.id) && hasIncompatibleFeatures(broker)) { Review comment: I see, what would happen to a currently live broker if it couldn't get any metadata update for a while, will it shut down itself? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)} + * + * The API of this class is evolving. See {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeFeaturesOptions extends AbstractOptions<DescribeFeaturesOptions> { + + /** + * - True means the {@link Admin#describeFeatures(DescribeFeaturesOptions)} request can be Review comment: `can be issued only to the controller.`/ `must be processed by the controller` ########## File path: core/src/main/scala/kafka/controller/KafkaController.scala ########## @@ -266,6 +275,179 @@ class KafkaController(val config: KafkaConfig, } } + private def createFeatureZNode(newNode: FeatureZNode): Int = { + info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode") + zkClient.createFeatureZNode(newNode) + val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path) Review comment: Yea, I mean you could use `val newVersion = zkClient.getDataAndVersion(FeatureZNode.path)._2`, but it's up to you. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -4052,6 +4058,128 @@ void handleFailure(Throwable throwable) { return new AlterClientQuotasResult(Collections.unmodifiableMap(futures)); } + @Override + public DescribeFeaturesResult describeFeatures(final DescribeFeaturesOptions options) { + final KafkaFutureImpl<FeatureMetadata> future = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + final NodeProvider provider = + options.sendRequestToController() ? new ControllerNodeProvider() : new LeastLoadedNodeProvider(); + + Call call = new Call( + "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), provider) { + + @Override + ApiVersionsRequest.Builder createRequest(int timeoutMs) { + return new ApiVersionsRequest.Builder(); + } + + @Override + void handleResponse(AbstractResponse response) { + final ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) response; + if (apiVersionsResponse.data.errorCode() == Errors.NONE.code()) { + future.complete( + new FeatureMetadata( + apiVersionsResponse.finalizedFeatures(), + apiVersionsResponse.finalizedFeaturesEpoch(), + apiVersionsResponse.supportedFeatures())); + } else if (options.sendRequestToController() && apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) { + handleNotControllerError(Errors.NOT_CONTROLLER); + } else { + future.completeExceptionally( + Errors.forCode(apiVersionsResponse.data.errorCode()).exception()); + } + } + + @Override + void handleFailure(Throwable throwable) { + completeAllExceptionally(Collections.singletonList(future), throwable); + } + }; + + runnable.call(call, now); + return new DescribeFeaturesResult(future); + } + + @Override + public UpdateFeaturesResult updateFeatures( + final Map<String, FeatureUpdate> featureUpdates, final UpdateFeaturesOptions options) { + if (featureUpdates == null || featureUpdates.isEmpty()) { + throw new IllegalArgumentException("Feature updates can not be null or empty."); + } + Objects.requireNonNull(options, "UpdateFeaturesOptions can not be null"); + + final Map<String, KafkaFutureImpl<Void>> updateFutures = new HashMap<>(); + final UpdateFeaturesRequestData.FeatureUpdateKeyCollection featureUpdatesRequestData Review comment: I suggest we build a static method in the `UpdateFeaturesRequest` class to avoid exposing the sub modules of feature data, such like: ``` public static UpdateFeaturesRequestData getFeatureRequest(final Map<String, FeatureUpdate> featureUpdate); ``` ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -466,6 +477,42 @@ private static DescribeGroupsResponseData prepareDescribeGroupsResponseData(Stri Collections.emptySet())); return data; } + + private static UpdateFeaturesResponse prepareUpdateFeaturesResponse(Map<String, Errors> featureUpdateErrors) { Review comment: Could be moved to the `UpdateFeaturesResponse` ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala ########## @@ -82,18 +108,54 @@ object FinalizedFeatureCache extends Logging { " The existing cache contents are %s").format(latest, oldFeatureAndEpoch) throw new FeatureCacheUpdateException(errorMsg) } else { - val incompatibleFeatures = SupportedFeatures.incompatibleFeatures(latest.features) + val incompatibleFeatures = brokerFeatures.incompatibleFeatures(latest.features) if (!incompatibleFeatures.empty) { val errorMsg = ("FinalizedFeatureCache update failed since feature compatibility" + " checks failed! Supported %s has incompatibilities with the latest %s." - ).format(SupportedFeatures.get, latest) + ).format(brokerFeatures.supportedFeatures, latest) throw new FeatureCacheUpdateException(errorMsg) } else { - val logMsg = "Updated cache from existing finalized %s to latest finalized %s".format( + val logMsg = "Updated cache from existing %s to latest %s".format( oldFeatureAndEpoch, latest) - featuresAndEpoch = Some(latest) + synchronized { + featuresAndEpoch = Some(latest) + notifyAll() + } info(logMsg) } } } + + /** + * Causes the current thread to wait no more than timeoutMs for the specified condition to be met. + * It is guaranteed that the provided condition will always be invoked only from within a + * synchronized block. + * + * @param waitCondition the condition to be waited upon: + * - if the condition returns true, then, the wait will stop. + * - if the condition returns false, it means the wait must continue until + * timeout. + * + * @param timeoutMs the timeout (in milli seconds) + * + * @throws TimeoutException if the condition is not met within timeoutMs. + */ + private def waitUntilConditionOrThrow(waitCondition: () => Boolean, timeoutMs: Long): Unit = { + if(timeoutMs < 0L) { + throw new IllegalArgumentException(s"Expected timeoutMs >= 0, but $timeoutMs was provided.") + } + val waitEndTimeNanos = System.nanoTime() + (timeoutMs * 1_000_000) Review comment: Why don't we just use `System.currentTimeMillis()` to avoid conversion between nano time? ########## File path: core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala ########## @@ -0,0 +1,550 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util +import java.util.Properties +import java.util.concurrent.ExecutionException + +import kafka.api.KAFKA_2_7_IV0 +import kafka.utils.TestUtils +import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion} +import kafka.utils.TestUtils.waitUntilTrue +import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult} +import org.apache.kafka.common.errors.InvalidRequestException +import org.apache.kafka.common.feature.FinalizedVersionRange +import org.apache.kafka.common.feature.{Features, SupportedVersionRange} +import org.apache.kafka.common.message.UpdateFeaturesRequestData +import org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{UpdateFeaturesRequest, UpdateFeaturesResponse} +import org.apache.kafka.common.utils.Utils +import org.junit.Test +import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue} +import org.scalatest.Assertions.{assertThrows, intercept} + +import scala.jdk.CollectionConverters._ +import scala.reflect.ClassTag +import scala.util.matching.Regex + +class UpdateFeaturesTest extends BaseRequestTest { + + override def brokerCount = 3 + + override def brokerPropertyOverrides(props: Properties): Unit = { + props.put(KafkaConfig.InterBrokerProtocolVersionProp, KAFKA_2_7_IV0.toString) + } + + private def defaultSupportedFeatures(): Features[SupportedVersionRange] = { + Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)))) + } + + private def defaultFinalizedFeatures(): Features[FinalizedVersionRange] = { + Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 2)))) + } + + private def updateSupportedFeatures( + features: Features[SupportedVersionRange], targetServers: Set[KafkaServer]): Unit = { + targetServers.foreach(s => { + s.brokerFeatures.setSupportedFeatures(features) + s.zkClient.updateBrokerInfo(s.createBrokerInfo) + }) + + // Wait until updates to all BrokerZNode supported features propagate to the controller. + val brokerIds = targetServers.map(s => s.config.brokerId) + waitUntilTrue( + () => servers.exists(s => { + if (s.kafkaController.isActive) { + s.kafkaController.controllerContext.liveOrShuttingDownBrokers + .filter(b => brokerIds.contains(b.id)) + .forall(b => { + b.features.equals(features) + }) + } else { + false + } + }), + "Controller did not get broker updates") + } + + private def updateSupportedFeaturesInAllBrokers(features: Features[SupportedVersionRange]): Unit = { + updateSupportedFeatures(features, Set[KafkaServer]() ++ servers) + } + + private def updateDefaultMinVersionLevelsInAllBrokers(newMinVersionLevels: Map[String, Short]): Unit = { + servers.foreach(s => { + s.brokerFeatures.setDefaultMinVersionLevels(newMinVersionLevels) + }) + } + + private def updateFeatureZNode(features: Features[FinalizedVersionRange]): Int = { + val server = serverForId(0).get + val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, features) + val newVersion = server.zkClient.updateFeatureZNode(newNode) + servers.foreach(s => { + s.featureCache.waitUntilEpochOrThrow(newVersion, s.config.zkConnectionTimeoutMs) + }) + newVersion + } + + private def getFeatureZNode(): FeatureZNode = { + val (mayBeFeatureZNodeBytes, version) = serverForId(0).get.zkClient.getDataAndVersion(FeatureZNode.path) + assertNotEquals(version, ZkVersion.UnknownVersion) + FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + } + + private def checkFeatures(client: Admin, expectedNode: FeatureZNode, expectedMetadata: FeatureMetadata): Unit = { + assertEquals(expectedNode, getFeatureZNode()) + val featureMetadata = client.describeFeatures( + new DescribeFeaturesOptions().sendRequestToController(true)).featureMetadata().get() + assertEquals(expectedMetadata, featureMetadata) + } + + private def checkException[ExceptionType <: Throwable](result: UpdateFeaturesResult, + featureExceptionMsgPatterns: Map[String, Regex]) + (implicit tag: ClassTag[ExceptionType]): Unit = { + featureExceptionMsgPatterns.foreach { + case (feature, exceptionMsgPattern) => + val exception = intercept[ExecutionException] { + result.values().get(feature).get() + } + val cause = exception.getCause + assertNotNull(cause) + assertEquals(cause.getClass, tag.runtimeClass) + assertTrue(cause.getMessage, exceptionMsgPattern.findFirstIn(cause.getMessage).isDefined) + } + + } + + /** + * Tests whether an invalid feature update does not get processed on the server as expected, + * and raises the ExceptionType on the client side as expected. + * + * @param invalidUpdate the invalid feature update to be sent in the + * updateFeatures request to the server + * @param exceptionMsgPattern a pattern for the expected exception message + */ + private def testWithInvalidFeatureUpdate[ExceptionType <: Throwable](feature: String, + invalidUpdate: FeatureUpdate, + exceptionMsgPattern: Regex) + (implicit tag: ClassTag[ExceptionType]): Unit = { + TestUtils.waitUntilControllerElected(zkClient) + + updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures()) + val versionBefore = updateFeatureZNode(defaultFinalizedFeatures()) + val adminClient = createAdminClient() + val nodeBefore = getFeatureZNode() + + val result = adminClient.updateFeatures(Utils.mkMap(Utils.mkEntry(feature, invalidUpdate)), new UpdateFeaturesOptions()) + + checkException[ExceptionType](result, Map(feature -> exceptionMsgPattern)) + checkFeatures( + adminClient, + nodeBefore, + new FeatureMetadata(defaultFinalizedFeatures(), versionBefore, defaultSupportedFeatures())) + } + + @Test + def testShouldFailRequestIfNotController(): Unit = { + TestUtils.waitUntilControllerElected(zkClient) + + updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures()) + val versionBefore = updateFeatureZNode(defaultFinalizedFeatures()) + + val nodeBefore = getFeatureZNode() + val updates = new FeatureUpdateKeyCollection() + val update = new UpdateFeaturesRequestData.FeatureUpdateKey(); + update.setFeature("feature_1"); + update.setMaxVersionLevel(defaultSupportedFeatures().get("feature_1").max()) + update.setAllowDowngrade(false) + updates.add(update) + + val response = connectAndReceive[UpdateFeaturesResponse]( + new UpdateFeaturesRequest.Builder(new UpdateFeaturesRequestData().setFeatureUpdates(updates)).build(), + notControllerSocketServer) + + assertEquals(1, response.data.results.size) + val result = response.data.results.asScala.head + assertEquals("feature_1", result.feature) + assertEquals(Errors.NOT_CONTROLLER, Errors.forCode(result.errorCode)) + assertNotNull(result.errorMessage) + assertFalse(result.errorMessage.isEmpty) + checkFeatures( + createAdminClient(), + nodeBefore, + new FeatureMetadata(defaultFinalizedFeatures(), versionBefore, defaultSupportedFeatures())) + } + + @Test + def testShouldFailRequestForEmptyUpdates(): Unit = { + val nullMap: util.Map[String, FeatureUpdate] = null + val emptyMap: util.Map[String, FeatureUpdate] = Utils.mkMap() + Set(nullMap, emptyMap).foreach { updates => + val client = createAdminClient() + val exception = intercept[IllegalArgumentException] { + client.updateFeatures(updates, new UpdateFeaturesOptions()) + } + assertNotNull(exception) + assertEquals("Feature updates can not be null or empty.", exception.getMessage) + } + } + + @Test + def testShouldFailRequestForNullUpdateFeaturesOptions(): Unit = { + val client = createAdminClient() + val update = new FeatureUpdate(defaultSupportedFeatures().get("feature_1").max(), false) + val exception = intercept[NullPointerException] { + client.updateFeatures(Utils.mkMap(Utils.mkEntry("feature_1", update)), null) + } + assertNotNull(exception) + assertEquals("UpdateFeaturesOptions can not be null", exception.getMessage) + } + + @Test + def testShouldFailRequestForInvalidFeatureName(): Unit = { + val client = createAdminClient() + val update = new FeatureUpdate(defaultSupportedFeatures().get("feature_1").max(), false) + val exception = intercept[IllegalArgumentException] { + client.updateFeatures(Utils.mkMap(Utils.mkEntry("", update)), new UpdateFeaturesOptions()) + } + assertNotNull(exception) + assertTrue((".*Provided feature can not be null or empty.*"r).findFirstIn(exception.getMessage).isDefined) + } + + @Test + def testShouldFailRequestWhenDowngradeFlagIsNotSetDuringDowngrade(): Unit = { + testWithInvalidFeatureUpdate[InvalidRequestException]( + "feature_1", + new FeatureUpdate((defaultFinalizedFeatures().get("feature_1").max() - 1).asInstanceOf[Short],false), + ".*Can not downgrade finalized feature: 'feature_1'.*allowDowngrade.*".r) + } + + @Test + def testShouldFailRequestWhenDowngradeToHigherVersionLevelIsAttempted(): Unit = { + testWithInvalidFeatureUpdate[InvalidRequestException]( + "feature_1", + new FeatureUpdate(defaultSupportedFeatures().get("feature_1").max(), true), + ".*finalized feature: 'feature_1'.*allowDowngrade.* provided maxVersionLevel:3.*existing maxVersionLevel:2.*".r) + } + + @Test + def testShouldFailRequestInClientWhenDowngradeFlagIsNotSetDuringDeletion(): Unit = { + assertThrows[IllegalArgumentException] { + new FeatureUpdate(0, false) + } + } + + @Test + def testShouldFailRequestInServerWhenDowngradeFlagIsNotSetDuringDeletion(): Unit = { + TestUtils.waitUntilControllerElected(zkClient) + + updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures()) + val versionBefore = updateFeatureZNode(defaultFinalizedFeatures()) + + val adminClient = createAdminClient() + val nodeBefore = getFeatureZNode() + + val updates + = new UpdateFeaturesRequestData.FeatureUpdateKeyCollection(); + val update = new UpdateFeaturesRequestData.FeatureUpdateKey(); + update.setFeature("feature_1") + update.setMaxVersionLevel(0) + update.setAllowDowngrade(false) + updates.add(update); + val requestData = new UpdateFeaturesRequestData() + requestData.setFeatureUpdates(updates); + + val response = connectAndReceive[UpdateFeaturesResponse]( + new UpdateFeaturesRequest.Builder(new UpdateFeaturesRequestData().setFeatureUpdates(updates)).build(), + controllerSocketServer) + + assertEquals(1, response.data().results().size()) + val result = response.data.results.asScala.head + assertEquals("feature_1", result.feature) + assertEquals(Errors.INVALID_REQUEST, Errors.forCode(result.errorCode)) + assertNotNull(result.errorMessage) + assertFalse(result.errorMessage.isEmpty) + val exceptionMsgPattern = ".*Can not provide maxVersionLevel: 0 less than 1 for feature: 'feature_1'.*allowDowngrade.*".r + assertTrue(exceptionMsgPattern.findFirstIn(result.errorMessage).isDefined) + checkFeatures( + adminClient, + nodeBefore, + new FeatureMetadata(defaultFinalizedFeatures(), versionBefore, defaultSupportedFeatures())) + } + + @Test + def testShouldFailRequestDuringDeletionOfNonExistingFeature(): Unit = { + testWithInvalidFeatureUpdate[InvalidRequestException]( + "feature_non_existing", + new FeatureUpdate(0, true), + ".*Can not delete non-existing finalized feature: 'feature_non_existing'.*".r) + } + + @Test + def testShouldFailRequestWhenUpgradingToSameVersionLevel(): Unit = { + testWithInvalidFeatureUpdate[InvalidRequestException]( + "feature_1", + new FeatureUpdate(defaultFinalizedFeatures().get("feature_1").max(), false), + ".*Can not upgrade a finalized feature: 'feature_1'.*to the same value.*".r) + } + + @Test + def testShouldFailRequestWhenDowngradingBelowMinVersionLevel(): Unit = { + TestUtils.waitUntilControllerElected(zkClient) + + updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures()) + val minVersionLevel = 2.asInstanceOf[Short] + updateDefaultMinVersionLevelsInAllBrokers(Map[String, Short]("feature_1" -> minVersionLevel)) + val initialFinalizedFeatures = Features.finalizedFeatures( + Utils.mkMap(Utils.mkEntry("feature_1", new FinalizedVersionRange(minVersionLevel, 2)))) + val versionBefore = updateFeatureZNode(initialFinalizedFeatures) + + val update = new FeatureUpdate((minVersionLevel - 1).asInstanceOf[Short], true) + val adminClient = createAdminClient() + val nodeBefore = getFeatureZNode() + + val result = adminClient.updateFeatures( + Utils.mkMap(Utils.mkEntry("feature_1", update)), new UpdateFeaturesOptions()) + + checkException[InvalidRequestException]( + result, + Map("feature_1" -> ".*Can not downgrade finalized feature: 'feature_1' to maxVersionLevel:1.*existing minVersionLevel:2.*".r)) + checkFeatures( + adminClient, + nodeBefore, + new FeatureMetadata(initialFinalizedFeatures, versionBefore, defaultSupportedFeatures())) + } + + @Test + def testShouldFailRequestDuringBrokerMaxVersionLevelIncompatibility(): Unit = { + TestUtils.waitUntilControllerElected(zkClient) + + val controller = servers.filter { server => server.kafkaController.isActive}.head + val nonControllerServers = servers.filter { server => !server.kafkaController.isActive} + val unsupportedBrokers = Set[KafkaServer](nonControllerServers.head) + val supportedBrokers = Set[KafkaServer](nonControllerServers(1), controller) + + updateSupportedFeatures(defaultSupportedFeatures(), supportedBrokers) + + val validMinVersion = defaultSupportedFeatures().get("feature_1").min() + val unsupportedMaxVersion = + (defaultSupportedFeatures().get("feature_1").max() - 1).asInstanceOf[Short] + val badSupportedFeatures = Features.supportedFeatures( + Utils.mkMap( + Utils.mkEntry("feature_1", + new SupportedVersionRange( + validMinVersion, + unsupportedMaxVersion)))) + updateSupportedFeatures(badSupportedFeatures, unsupportedBrokers) + + val versionBefore = updateFeatureZNode(defaultFinalizedFeatures()) + + val invalidUpdate = new FeatureUpdate(defaultSupportedFeatures().get("feature_1").max(), false) + val nodeBefore = getFeatureZNode() + val adminClient = createAdminClient() + val result = adminClient.updateFeatures( + Utils.mkMap(Utils.mkEntry("feature_1", invalidUpdate)), + new UpdateFeaturesOptions()) + + checkException[InvalidRequestException](result, Map("feature_1" -> ".*1 broker.*incompatible.*".r)) + checkFeatures( + adminClient, + nodeBefore, + new FeatureMetadata(defaultFinalizedFeatures(), versionBefore, defaultSupportedFeatures())) + } + + @Test + def testSuccessfulFeatureUpgradeAndWithNoExistingFinalizedFeatures(): Unit = { + TestUtils.waitUntilControllerElected(zkClient) + + updateSupportedFeaturesInAllBrokers( + Features.supportedFeatures( + Utils.mkMap( + Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)), + Utils.mkEntry("feature_2", new SupportedVersionRange(2, 5))))) + updateDefaultMinVersionLevelsInAllBrokers(Map[String, Short]("feature_1" -> 1, "feature_2" -> 2)) + val versionBefore = updateFeatureZNode(Features.emptyFinalizedFeatures()) + + val targetFinalizedFeatures = Features.finalizedFeatures( + Utils.mkMap( + Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 3)), + Utils.mkEntry("feature_2", new FinalizedVersionRange(2, 3)))) + val update1 = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(), false) + val update2 = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(), false) + + val expected = new FeatureMetadata( + targetFinalizedFeatures, + versionBefore + 1, + Features.supportedFeatures( + Utils.mkMap( + Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)), + Utils.mkEntry("feature_2", new SupportedVersionRange(2, 5))))) + + val adminClient = createAdminClient() + adminClient.updateFeatures( + Utils.mkMap(Utils.mkEntry("feature_1", update1), Utils.mkEntry("feature_2", update2)), + new UpdateFeaturesOptions() + ).all().get() + + checkFeatures( + adminClient, + new FeatureZNode(FeatureZNodeStatus.Enabled, targetFinalizedFeatures), + expected) + } + + @Test + def testSuccessfulFeatureUpgradeAndDowngrade(): Unit = { + TestUtils.waitUntilControllerElected(zkClient) + + updateSupportedFeaturesInAllBrokers( + Features.supportedFeatures( + Utils.mkMap( + Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)), + Utils.mkEntry("feature_2", new SupportedVersionRange(2, 5))))) + updateDefaultMinVersionLevelsInAllBrokers(Map[String, Short]("feature_1" -> 1, "feature_2" -> 2)) + val versionBefore = updateFeatureZNode( + Features.finalizedFeatures( + Utils.mkMap( + Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 2)), + Utils.mkEntry("feature_2", new FinalizedVersionRange(2, 4))))) + + val targetFinalizedFeatures = Features.finalizedFeatures( + Utils.mkMap( + Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 3)), + Utils.mkEntry("feature_2", new FinalizedVersionRange(2, 3)))) + val update1 = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(), false) + val update2 = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(), true) + + val expected = new FeatureMetadata( + targetFinalizedFeatures, + versionBefore + 1, + Features.supportedFeatures( + Utils.mkMap( + Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)), + Utils.mkEntry("feature_2", new SupportedVersionRange(2, 5))))) + + val adminClient = createAdminClient() + adminClient.updateFeatures( + Utils.mkMap(Utils.mkEntry("feature_1", update1), Utils.mkEntry("feature_2", update2)), + new UpdateFeaturesOptions() + ).all().get() + + checkFeatures( + adminClient, + new FeatureZNode(FeatureZNodeStatus.Enabled, targetFinalizedFeatures), + expected) + } + + @Test + def testPartialSuccessDuringValidFeatureUpgradeAndInvalidDowngrade(): Unit = { + TestUtils.waitUntilControllerElected(zkClient) + + val initialSupportedFeatures = Features.supportedFeatures( Review comment: nit: this could be extracted as a common struct. ########## File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json ########## @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 50, + "type": "request", + "name": "UpdateFeaturesRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+", + "about": "The list of updates to finalized features.", "fields": [ + {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true, + "about": "The name of the finalized feature to be updated."}, + {"name": "MaxVersionLevel", "type": "int16", "versions": "0+", Review comment: Same here ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.feature.Features; +import org.apache.kafka.common.feature.FinalizedVersionRange; +import org.apache.kafka.common.feature.SupportedVersionRange; + +/** + * Encapsulates details about finalized as well as supported features. This is particularly useful + * to hold the result returned by the {@link Admin#describeFeatures(DescribeFeaturesOptions)} API. + */ +public class FeatureMetadata { + + private final Features<FinalizedVersionRange> finalizedFeatures; + + private final Optional<Integer> finalizedFeaturesEpoch; + + private final Features<SupportedVersionRange> supportedFeatures; + + public FeatureMetadata( Review comment: Try to put first parameter on the same line as the constructor, and align the rest parameters. ########## File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json ########## @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 50, + "type": "request", + "name": "UpdateFeaturesRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+", + "about": "The list of updates to finalized features.", "fields": [ + {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true, Review comment: Space ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ########## @@ -1214,6 +1215,71 @@ default AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlterati */ AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options); + /** + * Describes finalized as well as supported features. By default, the request is issued to any + * broker. It can be optionally directed only to the controller via DescribeFeaturesOptions + * parameter. This is particularly useful if the user requires strongly consistent reads of + * finalized features. + * <p> + * The following exceptions can be anticipated when calling {@code get()} on the future from the + * returned {@link DescribeFeaturesResult}: + * <ul> + * <li>{@link org.apache.kafka.common.errors.TimeoutException} + * If the request timed out before the describe operation could finish.</li> + * </ul> + * <p> + * @param options the options to use + * + * @return the {@link DescribeFeaturesResult} containing the result + */ + DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options); + + /** + * Applies specified updates to finalized features. This operation is not transactional so it + * may succeed for some features while fail for others. + * <p> + * The API takes in a map of finalized feature name to {@link FeatureUpdate} that needs to be + * applied. Each entry in the map specifies the finalized feature to be added or updated or + * deleted, along with the new max feature version level value. This request is issued only to + * the controller since the API is only served by the controller. The return value contains an + * error code for each supplied {@link FeatureUpdate}, and the code indicates if the update + * succeeded or failed in the controller. + * <ul> + * <li>Downgrade of feature version level is not a regular operation/intent. It is only allowed + * in the controller if the {@link FeatureUpdate} has the allowDowngrade flag set - setting this + * flag conveys user intent to attempt downgrade of a feature max version level. Note that + * despite the allowDowngrade flag being set, certain downgrades may be rejected by the + * controller if it is deemed impossible.</li> + * <li>Deletion of a finalized feature version is not a regular operation/intent. It could be + * done by setting the allowDowngrade flag to true in the {@link FeatureUpdate}, and, setting + * the max version level to be less than 1.</li> + * </ul> + *<p> + * The following exceptions can be anticipated when calling {@code get()} on the futures + * obtained from the returned {@link UpdateFeaturesResult}: + * <ul> + * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException} + * If the authenticated user didn't have alter access to the cluster.</li> + * <li>{@link org.apache.kafka.common.errors.InvalidRequestException} Review comment: should this a per feature error or a top level error? ########## File path: clients/src/main/resources/common/message/UpdateFeaturesResponse.json ########## @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 50, + "type": "response", + "name": "UpdateFeaturesResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ Review comment: For top level exception such as cluster authorization exception, we could just define a top level error code instead of check-marking every feature with the redundant error code. I know we have been a bit inconsistent in such a case, but personally feel having layered error codes could make the response handling clear of whether it is per feature issue, or a high level issue. ########## File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json ########## @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 50, + "type": "request", + "name": "UpdateFeaturesRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+", + "about": "The list of updates to finalized features.", "fields": [ + {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true, + "about": "The name of the finalized feature to be updated."}, + {"name": "MaxVersionLevel", "type": "int16", "versions": "0+", + "about": "The new maximum version level for the finalized feature. A value >= 1 is valid. A value < 1, is special, and can be used to request the deletion of the finalized feature."}, + {"name": "AllowDowngrade", "type": "bool", "versions": "0+", Review comment: Same here ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)} + * + * The API of this class is evolving. See {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeFeaturesOptions extends AbstractOptions<DescribeFeaturesOptions> { + + /** + * - True means the {@link Admin#describeFeatures(DescribeFeaturesOptions)} request can be + * issued only to the controller. + * - False means the {@link Admin#describeFeatures(DescribeFeaturesOptions)} request can be Review comment: `could be processed by any random broker` ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -4052,6 +4058,128 @@ void handleFailure(Throwable throwable) { return new AlterClientQuotasResult(Collections.unmodifiableMap(futures)); } + @Override + public DescribeFeaturesResult describeFeatures(final DescribeFeaturesOptions options) { + final KafkaFutureImpl<FeatureMetadata> future = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + final NodeProvider provider = + options.sendRequestToController() ? new ControllerNodeProvider() : new LeastLoadedNodeProvider(); + + Call call = new Call( + "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), provider) { + + @Override + ApiVersionsRequest.Builder createRequest(int timeoutMs) { + return new ApiVersionsRequest.Builder(); + } + + @Override + void handleResponse(AbstractResponse response) { + final ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) response; + if (apiVersionsResponse.data.errorCode() == Errors.NONE.code()) { + future.complete( + new FeatureMetadata( + apiVersionsResponse.finalizedFeatures(), + apiVersionsResponse.finalizedFeaturesEpoch(), + apiVersionsResponse.supportedFeatures())); + } else if (options.sendRequestToController() && apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) { + handleNotControllerError(Errors.NOT_CONTROLLER); + } else { + future.completeExceptionally( + Errors.forCode(apiVersionsResponse.data.errorCode()).exception()); + } + } + + @Override + void handleFailure(Throwable throwable) { + completeAllExceptionally(Collections.singletonList(future), throwable); + } + }; + + runnable.call(call, now); + return new DescribeFeaturesResult(future); + } + + @Override + public UpdateFeaturesResult updateFeatures( + final Map<String, FeatureUpdate> featureUpdates, final UpdateFeaturesOptions options) { + if (featureUpdates == null || featureUpdates.isEmpty()) { + throw new IllegalArgumentException("Feature updates can not be null or empty."); + } + Objects.requireNonNull(options, "UpdateFeaturesOptions can not be null"); + + final Map<String, KafkaFutureImpl<Void>> updateFutures = new HashMap<>(); + final UpdateFeaturesRequestData.FeatureUpdateKeyCollection featureUpdatesRequestData + = new UpdateFeaturesRequestData.FeatureUpdateKeyCollection(); + for (Map.Entry<String, FeatureUpdate> entry : featureUpdates.entrySet()) { + final String feature = entry.getKey(); + final FeatureUpdate update = entry.getValue(); + if (feature.trim().isEmpty()) { + throw new IllegalArgumentException("Provided feature can not be null or empty."); + } + + updateFutures.put(feature, new KafkaFutureImpl<>()); + final UpdateFeaturesRequestData.FeatureUpdateKey requestItem = + new UpdateFeaturesRequestData.FeatureUpdateKey(); + requestItem.setFeature(feature); + requestItem.setMaxVersionLevel(update.maxVersionLevel()); + requestItem.setAllowDowngrade(update.allowDowngrade()); + featureUpdatesRequestData.add(requestItem); + } + final UpdateFeaturesRequestData request = new UpdateFeaturesRequestData().setFeatureUpdates(featureUpdatesRequestData); + + final long now = time.milliseconds(); + final Call call = new Call("updateFeatures", calcDeadlineMs(now, options.timeoutMs()), + new ControllerNodeProvider()) { + + @Override + UpdateFeaturesRequest.Builder createRequest(int timeoutMs) { + return new UpdateFeaturesRequest.Builder(request); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + final UpdateFeaturesResponse response = + (UpdateFeaturesResponse) abstractResponse; + + // Check for controller change. + for (UpdatableFeatureResult result : response.data().results()) { + final Errors error = Errors.forCode(result.errorCode()); + if (error == Errors.NOT_CONTROLLER) { + handleNotControllerError(error); + throw error.exception(); + } + } + + for (UpdatableFeatureResult result : response.data().results()) { + final KafkaFutureImpl<Void> future = updateFutures.get(result.feature()); + if (future == null) { Review comment: Does this overlap with `completeUnrealizedFutures` check? We could just keep one to reduce the checking complexity. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.feature.Features; +import org.apache.kafka.common.feature.FinalizedVersionRange; +import org.apache.kafka.common.feature.SupportedVersionRange; + +/** + * Encapsulates details about finalized as well as supported features. This is particularly useful + * to hold the result returned by the {@link Admin#describeFeatures(DescribeFeaturesOptions)} API. + */ +public class FeatureMetadata { + + private final Features<FinalizedVersionRange> finalizedFeatures; + + private final Optional<Integer> finalizedFeaturesEpoch; + + private final Features<SupportedVersionRange> supportedFeatures; + + public FeatureMetadata( + final Features<FinalizedVersionRange> finalizedFeatures, + final int finalizedFeaturesEpoch, + final Features<SupportedVersionRange> supportedFeatures) { + Objects.requireNonNull(finalizedFeatures, "Provided finalizedFeatures can not be null."); + Objects.requireNonNull(supportedFeatures, "Provided supportedFeatures can not be null."); + this.finalizedFeatures = finalizedFeatures; + if (finalizedFeaturesEpoch >= 0) { + this.finalizedFeaturesEpoch = Optional.of(finalizedFeaturesEpoch); + } else { + this.finalizedFeaturesEpoch = Optional.empty(); + } + this.supportedFeatures = supportedFeatures; + } + + /** + * A map of finalized feature versions, with key being finalized feature name and value + * containing the min/max version levels for the finalized feature. + */ + public Features<FinalizedVersionRange> finalizedFeatures() { + return finalizedFeatures; + } + + /** + * The epoch for the finalized features. + * If the returned value is empty, it means the finalized features are absent/unavailable. + */ + public Optional<Integer> finalizedFeaturesEpoch() { + return finalizedFeaturesEpoch; + } + + /** + * A map of supported feature versions, with key being supported feature name and value + * containing the min/max version for the supported feature. + */ + public Features<SupportedVersionRange> supportedFeatures() { + return supportedFeatures; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other instanceof FeatureMetadata)) { + return false; + } + + final FeatureMetadata that = (FeatureMetadata) other; + return Objects.equals(this.finalizedFeatures, that.finalizedFeatures) && + Objects.equals(this.finalizedFeaturesEpoch, that.finalizedFeaturesEpoch) && + Objects.equals(this.supportedFeatures, that.supportedFeatures); + } + + @Override + public int hashCode() { + return Objects.hash(finalizedFeatures, finalizedFeaturesEpoch, supportedFeatures); + } + + @Override + public String toString() { + return String.format( + "FeatureMetadata{finalized:%s, finalizedFeaturesEpoch:%d, supported:%s}", + finalizedFeatures, + finalizedFeaturesEpoch, Review comment: This won't work well with string format, consider doing `orElse` ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)} + * + * The API of this class is evolving. See {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeFeaturesOptions extends AbstractOptions<DescribeFeaturesOptions> { + + /** + * - True means the {@link Admin#describeFeatures(DescribeFeaturesOptions)} request can be + * issued only to the controller. + * - False means the {@link Admin#describeFeatures(DescribeFeaturesOptions)} request can be + * issued to any random broker. + */ + private boolean sendRequestToController = false; + + /** + * Sets a flag indicating that the describe features request should be issued to the controller. Review comment: Same here ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/FeatureUpdate.java ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import java.util.Map; +import java.util.Objects; + +/** + * Encapsulates details about an update to a finalized feature. This is particularly useful to + * define each feature update in the {@link Admin#updateFeatures(Map, UpdateFeaturesOptions)} API. + */ +public class FeatureUpdate { + private final short maxVersionLevel; + private final boolean allowDowngrade; + + /** + * @param maxVersionLevel the new maximum version level for the finalized feature. + * a value < 1 is special and indicates that the update is intended to + * delete the finalized feature, and should be accompanied by setting + * the allowDowngrade flag to true. + * @param allowDowngrade - true, if this feature update was meant to downgrade the existing + * maximum version level of the finalized feature. + * - false, otherwise. + */ + public FeatureUpdate(final short maxVersionLevel, final boolean allowDowngrade) { + if (maxVersionLevel < 1 && !allowDowngrade) { + throw new IllegalArgumentException(String.format( + "The allowDowngrade flag should be set when the provided maxVersionLevel:%d is < 1.", + maxVersionLevel)); + } + this.maxVersionLevel = maxVersionLevel; + this.allowDowngrade = allowDowngrade; + } + + public short maxVersionLevel() { + return maxVersionLevel; + } + + public boolean allowDowngrade() { + return allowDowngrade; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (!(other instanceof FeatureUpdate)) { + return false; + } + + final FeatureUpdate that = (FeatureUpdate) other; + return this.maxVersionLevel == that.maxVersionLevel && this.allowDowngrade == that.allowDowngrade; + } + + @Override + public int hashCode() { + return Objects.hash(maxVersionLevel, allowDowngrade); + } + + @Override + public String toString() { + return String.format("FeatureUpdate{maxVersionLevel:%d, allowDowngrade:%s}", maxVersionLevel, allowDowngrade); + } +} Review comment: new line ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -3193,6 +3238,104 @@ public void testListOffsetsNonRetriableErrors() throws Exception { } } + @Test + public void testUpdateFeaturesDuringSuccess() throws Exception { + testUpdateFeaturesDuringError(Errors.NONE); + } + + @Test + public void testUpdateFeaturesInvalidRequestError() throws Exception { + testUpdateFeaturesDuringError(Errors.INVALID_REQUEST); + } + + @Test + public void testUpdateFeaturesUpdateFailedError() throws Exception { + testUpdateFeaturesDuringError(Errors.FEATURE_UPDATE_FAILED); + } + + private void testUpdateFeaturesDuringError(Errors error) throws Exception { + try (final AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().prepareResponse( + body -> body instanceof UpdateFeaturesRequest, + prepareUpdateFeaturesResponse(error)); + final KafkaFuture<Void> future = env.adminClient().updateFeatures( + new HashSet<>( + Arrays.asList( + new FeatureUpdate( + "test_feature_1", (short) 2, false), + new FeatureUpdate( + "test_feature_2", (short) 3, true))), + new UpdateFeaturesOptions().timeoutMs(10000)).result(); + if (error.exception() == null) { + future.get(); + } else { + final ExecutionException e = assertThrows(ExecutionException.class, + () -> future.get()); + assertEquals(e.getCause().getClass(), error.exception().getClass()); + } + } + } + + @Test + public void testUpdateFeaturesHandleNotControllerException() throws Exception { + try (final AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().prepareResponseFrom( + prepareUpdateFeaturesResponse(Errors.NOT_CONTROLLER), + env.cluster().nodeById(0)); + env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(env.cluster().nodes(), + env.cluster().clusterResource().clusterId(), + 1, + Collections.<MetadataResponse.TopicMetadata>emptyList())); + env.kafkaClient().prepareResponseFrom( Review comment: You are right, it seems not necessary. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.kafka.common.message.UpdateFeaturesResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; + + +/** + * Possible error codes: + * + * - {@link Errors#CLUSTER_AUTHORIZATION_FAILED} + * - {@link Errors#NOT_CONTROLLER} + * - {@link Errors#INVALID_REQUEST} + * - {@link Errors#FEATURE_UPDATE_FAILED} + */ +public class UpdateFeaturesResponse extends AbstractResponse { + + private final UpdateFeaturesResponseData data; + + public UpdateFeaturesResponse(UpdateFeaturesResponseData data) { + this.data = data; + } + + public UpdateFeaturesResponse(Struct struct) { + final short latestVersion = (short) (UpdateFeaturesResponseData.SCHEMAS.length - 1); + this.data = new UpdateFeaturesResponseData(struct, latestVersion); + } + + public UpdateFeaturesResponse(Struct struct, short version) { + this.data = new UpdateFeaturesResponseData(struct, version); + } + + public Map<String, ApiError> errors() { + return data.results().valuesSet().stream().collect( + Collectors.toMap( + result -> result.feature(), Review comment: nit: could be replaced with lambda ########## File path: clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java ########## @@ -143,7 +172,13 @@ public static ApiVersionsResponse apiVersionsResponse( Features<FinalizedVersionRange> finalizedFeatures, int finalizedFeaturesEpoch) { if (maxMagic == RecordBatch.CURRENT_MAGIC_VALUE && throttleTimeMs == DEFAULT_THROTTLE_TIME) { - return DEFAULT_API_VERSIONS_RESPONSE; + return new ApiVersionsResponse(createApiVersionsResponseData( + DEFAULT_API_VERSIONS_RESPONSE.throttleTimeMs(), + Errors.forCode(DEFAULT_API_VERSIONS_RESPONSE.data().errorCode()), + DEFAULT_API_VERSIONS_RESPONSE.data().apiKeys(), + latestSupportedFeatures, + finalizedFeatures, + finalizedFeaturesEpoch)); Review comment: Comment here since no better place: createApiVersionsResponse on L198 could be made private ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -3615,6 +3662,137 @@ public void testListOffsetsNonRetriableErrors() throws Exception { } } + @Test + public void testUpdateFeaturesDuringSuccess() throws Exception { + testUpdateFeatures( + makeTestFeatureUpdates(), + makeTestFeatureUpdateErrors(Errors.NONE)); + } + + @Test + public void testUpdateFeaturesInvalidRequestError() throws Exception { + testUpdateFeatures( + makeTestFeatureUpdates(), + makeTestFeatureUpdateErrors(Errors.INVALID_REQUEST)); + } + + @Test + public void testUpdateFeaturesUpdateFailedError() throws Exception { + testUpdateFeatures( + makeTestFeatureUpdates(), + makeTestFeatureUpdateErrors(Errors.FEATURE_UPDATE_FAILED)); + } + + @Test + public void testUpdateFeaturesPartialSuccess() throws Exception { + final Map<String, Errors> errors = makeTestFeatureUpdateErrors(Errors.NONE); + errors.put("test_feature_2", Errors.INVALID_REQUEST); + testUpdateFeatures(makeTestFeatureUpdates(), errors); + } + + private Map<String, FeatureUpdate> makeTestFeatureUpdates() { + return Utils.mkMap( + Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2, false)), + Utils.mkEntry("test_feature_2", new FeatureUpdate((short) 3, true))); + } + + private Map<String, Errors> makeTestFeatureUpdateErrors(final Errors error) { + final Map<String, FeatureUpdate> updates = makeTestFeatureUpdates(); + final Map<String, Errors> errors = new HashMap<>(); + for (Map.Entry<String, FeatureUpdate> entry : updates.entrySet()) { + errors.put(entry.getKey(), error); + } + return errors; + } + + private void testUpdateFeatures(Map<String, FeatureUpdate> featureUpdates, + Map<String, Errors> featureUpdateErrors) throws Exception { + try (final AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().prepareResponse( + body -> body instanceof UpdateFeaturesRequest, + prepareUpdateFeaturesResponse(featureUpdateErrors)); + final Map<String, KafkaFuture<Void>> futures = env.adminClient().updateFeatures( + featureUpdates, + new UpdateFeaturesOptions().timeoutMs(10000)).values(); + for (Map.Entry<String, KafkaFuture<Void>> entry : futures.entrySet()) { + final KafkaFuture<Void> future = entry.getValue(); + final Errors error = featureUpdateErrors.get(entry.getKey()); + if (error == Errors.NONE) { + future.get(); + } else { + final ExecutionException e = assertThrows(ExecutionException.class, + () -> future.get()); Review comment: nit: could use lambda ########## File path: core/src/main/scala/kafka/controller/KafkaController.scala ########## @@ -266,6 +275,178 @@ class KafkaController(val config: KafkaConfig, } } + private def createFeatureZNode(newNode: FeatureZNode): Int = { + info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode") + zkClient.createFeatureZNode(newNode) + val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path) + newVersion + } + + private def updateFeatureZNode(updatedNode: FeatureZNode): Int = { + info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode") + zkClient.updateFeatureZNode(updatedNode) + } + + /** + * This method enables the feature versioning system (KIP-584). + * + * Development in Kafka (from a high level) is organized into features. Each feature is tracked by + * a name and a range of version numbers. A feature can be of two types: + * + * 1. Supported feature: + * A supported feature is represented by a name (String) and a range of versions (defined by a + * {@link SupportedVersionRange}). It refers to a feature that a particular broker advertises + * support for. Each broker advertises the version ranges of its own supported features in its + * own BrokerIdZNode. The contents of the advertisement are specific to the particular broker and + * do not represent any guarantee of a cluster-wide availability of the feature for any particular + * range of versions. + * + * 2. Finalized feature: + * A finalized feature is represented by a name (String) and a range of version levels (defined + * by a {@link FinalizedVersionRange}). Whenever the feature versioning system (KIP-584) is + * enabled, the finalized features are stored in the cluster-wide common FeatureZNode. + * In comparison to a supported feature, the key difference is that a finalized feature exists + * in ZK only when it is guaranteed to be supported by any random broker in the cluster for a + * specified range of version levels. Also, the controller is the only entity modifying the + * information about finalized features. + * + * This method sets up the FeatureZNode with enabled status, which means that the finalized + * features stored in the FeatureZNode are active. The enabled status should be written by the + * controller to the FeatureZNode only when the broker IBP config is greater than or equal to + * KAFKA_2_7_IV0. + * + * There are multiple cases handled here: + * + * 1. New cluster bootstrap: + * A new Kafka cluster (i.e. it is deployed first time) is almost always started with IBP config + * setting greater than or equal to KAFKA_2_7_IV0. We would like to start the cluster with all + * the possible supported features finalized immediately. Assuming this is the case, the + * controller will start up and notice that the FeatureZNode is absent in the new cluster, + * it will then create a FeatureZNode (with enabled status) containing the entire list of + * default supported features as its finalized features. + * + * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0: + * Imagine there is an existing Kafka cluster with IBP config less than KAFKA_2_7_IV0, and the + * broker binary has been upgraded to a newer version that supports the feature versioning + * system (KIP-584). This means the user is upgrading from an earlier version of the broker + * binary. In this case, we want to start with no finalized features and allow the user to + * finalize them whenever they are ready i.e. in the future whenever the user sets IBP config + * to be greater than or equal to KAFKA_2_7_IV0, then the user could start finalizing the + * features. This process ensures we do not enable all the possible features immediately after + * an upgrade, which could be harmful to Kafka. + * This is how we handle such a case: + * - Before the IBP config upgrade (i.e. IBP config set to less than KAFKA_2_7_IV0), the + * controller will start up and check if the FeatureZNode is absent. If absent, it will + * react by creating a FeatureZNode with disabled status and empty finalized features. + * Otherwise, if a node already exists in enabled status then the controller will just + * flip the status to disabled and clear the finalized features. + * - After the IBP config upgrade (i.e. IBP config set to greater than or equal to + * KAFKA_2_7_IV0), when the controller starts up it will check if the FeatureZNode exists + * and whether it is disabled. In such a case, it won’t upgrade all features immediately. + * Instead it will just switch the FeatureZNode status to enabled status. This lets the + * user finalize the features later. + * + * 3. Broker binary upgraded, with existing cluster IBP config >= KAFKA_2_7_IV0: + * Imagine an existing Kafka cluster with IBP config >= KAFKA_2_7_IV0, and the broker binary + * has just been upgraded to a newer version (that supports IBP config KAFKA_2_7_IV0 and higher). + * The controller will start up and find that a FeatureZNode is already present with enabled + * status and existing finalized features. In such a case, the controller needs to scan the + * existing finalized features and mutate them for the purpose of version level deprecation + * (if needed). + * This is how we handle this case: If an existing finalized feature is present in the default + * finalized features, then, its existing minimum version level is updated to the default + * minimum version level maintained in the BrokerFeatures object. The goal of this mutation is + * to permanently deprecate one or more feature version levels. The range of feature version + * levels deprecated are from the closed range: [existing_min_version_level, default_min_version_level]. + * NOTE: Deprecating a feature version level is an incompatible change, which requires a major + * release of Kafka. In such a release, the minimum version level maintained within the + * BrokerFeatures class is updated suitably to record the deprecation of the feature. + * + * 4. Broker downgrade: + * Imagine that a Kafka cluster exists already and the IBP config is greater than or equal to + * KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by setting IBP config to a + * value less than KAFKA_2_7_IV0. This means the user is also disabling the feature versioning + * system (KIP-584). In this case, when the controller starts up with the lower IBP config, it + * will switch the FeatureZNode status to disabled with empty features. + */ + private def enableFeatureVersioning(): Unit = { + val defaultFinalizedFeatures = brokerFeatures.getDefaultFinalizedFeatures + val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(FeatureZNode.path) + if (version == ZkVersion.UnknownVersion) { + val newVersion = createFeatureZNode(new FeatureZNode(FeatureZNodeStatus.Enabled, defaultFinalizedFeatures)) + featureCache.waitUntilEpochOrThrow(newVersion, config.zkConnectionTimeoutMs) + } else { + val existingFeatureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + var newFeatures: Features[FinalizedVersionRange] = Features.emptyFinalizedFeatures() + if (existingFeatureZNode.status.equals(FeatureZNodeStatus.Enabled)) { + newFeatures = Features.finalizedFeatures(existingFeatureZNode.features.features().asScala.map { + case (featureName, existingVersionRange) => + val brokerDefaultVersionRange = defaultFinalizedFeatures.get(featureName) + if (brokerDefaultVersionRange == null) { + warn(s"Existing finalized feature: $featureName with $existingVersionRange" + + s" is absent in default finalized $defaultFinalizedFeatures") + (featureName, existingVersionRange) + } else if (brokerDefaultVersionRange.max() >= existingVersionRange.max() && + brokerDefaultVersionRange.min() <= existingVersionRange.max()) { + // Through this change, we deprecate all version levels in the closed range: + // [existingVersionRange.min(), brokerDefaultVersionRange.min() - 1] + (featureName, new FinalizedVersionRange(brokerDefaultVersionRange.min(), existingVersionRange.max())) + } else { + // If the existing version levels fall completely outside the Review comment: Are we good to proceed in this case? When there is no overlapping between broker default features and remote finalized features, is the current controller still eligible? ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -3615,6 +3662,137 @@ public void testListOffsetsNonRetriableErrors() throws Exception { } } + @Test + public void testUpdateFeaturesDuringSuccess() throws Exception { + testUpdateFeatures( + makeTestFeatureUpdates(), + makeTestFeatureUpdateErrors(Errors.NONE)); + } + + @Test + public void testUpdateFeaturesInvalidRequestError() throws Exception { + testUpdateFeatures( + makeTestFeatureUpdates(), + makeTestFeatureUpdateErrors(Errors.INVALID_REQUEST)); + } + + @Test + public void testUpdateFeaturesUpdateFailedError() throws Exception { + testUpdateFeatures( + makeTestFeatureUpdates(), + makeTestFeatureUpdateErrors(Errors.FEATURE_UPDATE_FAILED)); + } + + @Test + public void testUpdateFeaturesPartialSuccess() throws Exception { + final Map<String, Errors> errors = makeTestFeatureUpdateErrors(Errors.NONE); + errors.put("test_feature_2", Errors.INVALID_REQUEST); + testUpdateFeatures(makeTestFeatureUpdates(), errors); + } + + private Map<String, FeatureUpdate> makeTestFeatureUpdates() { + return Utils.mkMap( + Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2, false)), + Utils.mkEntry("test_feature_2", new FeatureUpdate((short) 3, true))); + } + + private Map<String, Errors> makeTestFeatureUpdateErrors(final Errors error) { + final Map<String, FeatureUpdate> updates = makeTestFeatureUpdates(); Review comment: Could we make `updates` as a pass-in parameter to avoid calling `makeTestFeatureUpdates` twice? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -2956,6 +2959,37 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleUpdateFeatures(request: RequestChannel.Request): Unit = { + val updateFeaturesRequest = request.body[UpdateFeaturesRequest] + def featureUpdateErrors(error: Errors, msgOverride: Option[String]): Map[String, ApiError] = { + updateFeaturesRequest.data().featureUpdates().asScala.map( + update => update.feature() -> new ApiError(error, msgOverride.getOrElse(error.message())) + ).toMap + } + + def sendResponseCallback(updateErrors: Map[String, ApiError]): Unit = { + val results = new UpdatableFeatureResultCollection() Review comment: Could be moved to `UpdateFeaturesResponse` as a utility. ########## File path: core/src/main/scala/kafka/controller/KafkaController.scala ########## @@ -266,6 +275,179 @@ class KafkaController(val config: KafkaConfig, } } + private def createFeatureZNode(newNode: FeatureZNode): Int = { + info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode") + zkClient.createFeatureZNode(newNode) + val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path) + newVersion + } + + private def updateFeatureZNode(updatedNode: FeatureZNode): Int = { + info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode") + zkClient.updateFeatureZNode(updatedNode) + } + + /** + * This method enables the feature versioning system (KIP-584). + * + * Development in Kafka (from a high level) is organized into features. Each feature is tracked by + * a name and a range of version numbers. A feature can be of two types: + * + * 1. Supported feature: + * A supported feature is represented by a name (String) and a range of versions (defined by a + * {@link SupportedVersionRange}). It refers to a feature that a particular broker advertises + * support for. Each broker advertises the version ranges of it’s own supported features in its + * own BrokerIdZnode. The contents of the advertisement are specific to the particular broker and + * do not represent any guarantee of a cluster-wide availability of the feature for any particular + * range of versions. + * + * 2. Finalized feature: + * A finalized feature is is represented by a name (String) and a range of version levels (defined + * by a {@link FinalizedVersionRange}). Whenever the feature versioning system (KIP-584) is + * enabled, the finalized features are stored in ZK in the cluster-wide common FeatureZNode. + * In comparison to a supported feature, the key difference is that a finalized feature exists + * in ZK only when it is guaranteed to be supported by any random broker in the cluster for a + * specified range of version levels. Also, the controller is the one and only entity modifying + * the information about finalized features and their version levels. + * + * This method sets up the FeatureZNode with enabled status. This status means the feature + * versioning system (KIP-584) is enabled, and, the finalized features stored in the FeatureZNode + * are active. This status should be written by the controller to the FeatureZNode only when the + * broker IBP config is greater than or equal to KAFKA_2_7_IV0. + * + * There are multiple cases handled here: + * + * 1. New cluster bootstrap: + * A new Kafka cluster (i.e. it is deployed first time) is almost always started with IBP config + * setting greater than or equal to KAFKA_2_7_IV0. We would like to start the cluster with all + * the possible supported features finalized immediately. Assuming this is the case, the + * controller will start up and notice that the FeatureZNode is absent in the new cluster, + * it will then create a FeatureZNode (with enabled status) containing the entire list of + * default supported features as its finalized features. + * + * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0: + * Imagine there is an existing Kafka cluster with IBP config less than KAFKA_2_7_IV0, and the + * Broker binary has been upgraded to a newer version that supports the feature versioning + * system (KIP-584). This means the user is upgrading from an earlier version of the Broker + * binary. In this case, we want to start with no finalized features and allow the user to + * finalize them whenever they are ready i.e. in the future whenever the user sets IBP config + * to be greater than or equal to KAFKA_2_7_IV0, then the user could start finalizing the + * features. The reason to do this is that enabling all the possible features immediately after + * an upgrade could be harmful to the cluster. + * This is how we handle such a case: + * - Before the IBP config upgrade (i.e. IBP config set to less than KAFKA_2_7_IV0), the + * controller will start up and check if the FeatureZNode is absent. If absent, then it + * will react by creating a FeatureZNode with disabled status and empty finalized features. + * Otherwise, if a node already exists in enabled status then the controller will just + * flip the status to disabled and clear the finalized features. + * - After the IBP config upgrade (i.e. IBP config set to greater than or equal to + * KAFKA_2_7_IV0), when the controller starts up it will check if the FeatureZNode exists + * and whether it is disabled. In such a case, it won’t upgrade all features immediately. + * Instead it will just switch the FeatureZNode status to enabled status. This lets the + * user finalize the features later. + * + * 3. Broker binary upgraded, with existing cluster IBP config >= KAFKA_2_7_IV0: + * Imagine an existing Kafka cluster with IBP config >= KAFKA_2_7_IV0, and the broker binary + * has just been upgraded to a newer version (that supports IBP config KAFKA_2_7_IV0 and higher). + * The controller will start up and find that a FeatureZNode is already present with enabled + * status and existing finalized features. In such a case, the controller needs to scan the + * existing finalized features and mutate them for the purpose of version level deprecation + * (if needed). + * This is how we handle this case: If an existing finalized feature is present in the default + * finalized features, then, it's existing minimum version level is updated to the default + * minimum version level maintained in the BrokerFeatures object. The goal of this mutation is + * to permanently deprecate one or more feature version levels. The range of feature version + * levels deprecated are from the closed range: [existing_min_version_level, default_min_version_level]. + * NOTE: Deprecating a feature version level is an incompatible change, which requires a major + * release of Kafka. In such a release, the minimum version level maintained within the + * BrokerFeatures class is updated suitably to record the deprecation of the feature. + * + * 4. Broker downgrade: + * Imagine that a Kafka cluster exists already and the IBP config is greater than or equal to + * KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by setting IBP config to a + * value less than KAFKA_2_7_IV0. This means the user is also disabling the feature versioning + * system (KIP-584). In this case, when the controller starts up with the lower IBP config, it + * will switch the FeatureZNode status to disabled with empty features. + */ + private def enableFeatureVersioning(): Unit = { + val defaultFinalizedFeatures = brokerFeatures.getDefaultFinalizedFeatures + val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(FeatureZNode.path) + if (version == ZkVersion.UnknownVersion) { + val newVersion = createFeatureZNode(new FeatureZNode(FeatureZNodeStatus.Enabled, defaultFinalizedFeatures)) + featureCache.waitUntilEpochOrThrow(newVersion, config.zkConnectionTimeoutMs) + } else { + val existingFeatureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + var newFeatures: Features[FinalizedVersionRange] = Features.emptyFinalizedFeatures() + if (existingFeatureZNode.status.equals(FeatureZNodeStatus.Enabled)) { + newFeatures = Features.finalizedFeatures(existingFeatureZNode.features.features().asScala.map { + case (featureName, existingVersionRange) => { + val brokerDefaultVersionRange = defaultFinalizedFeatures.get(featureName) + if (brokerDefaultVersionRange == null) { + warn(s"Existing finalized feature: $featureName with $existingVersionRange" + + s" is absent in default finalized $defaultFinalizedFeatures") + (featureName, existingVersionRange) + } else if (existingVersionRange.max() >= brokerDefaultVersionRange.min() && + brokerDefaultVersionRange.max() >= existingVersionRange.max()) { + // Through this change, we deprecate all version levels in the closed range: + // [existingVersionRange.min(), brokerDefaultVersionRange.min() - 1] + (featureName, new FinalizedVersionRange(brokerDefaultVersionRange.min(), existingVersionRange.max())) + } else { + // If the existing version levels fall completely outside the + // range of the default finalized version levels (i.e. no intersection), or, if the + // existing version levels are ineligible for a modification since they are + // incompatible with default finalized version levels, then we skip the update. + warn(s"Can not update minimum version level in finalized feature: $featureName," + + s" since the existing $existingVersionRange is not eligible for a change" + + s" based on the default $brokerDefaultVersionRange.") + (featureName, existingVersionRange) + } + } + }.asJava) + } + val newFeatureZNode = new FeatureZNode(FeatureZNodeStatus.Enabled, newFeatures) + if (!newFeatureZNode.equals(existingFeatureZNode)) { Review comment: I see, still wondering if we could just check whether `newFeatures` is equal to `existingFeatureZNode.features` ########## File path: core/src/main/scala/kafka/controller/KafkaController.scala ########## @@ -1647,6 +1844,185 @@ class KafkaController(val config: KafkaConfig, } } + /** + * Returns the new FinalizedVersionRange for the feature, if there are no feature + * incompatibilities seen with all known brokers for the provided feature update. + * Otherwise returns a suitable error. + * + * @param update the feature update to be processed (this can not be meant to delete the feature) + * + * @return the new FinalizedVersionRange or error, as described above. + */ + private def newFinalizedVersionRangeOrIncompatibilityError(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, ApiError] = { + if (UpdateFeaturesRequest.isDeleteRequest(update)) { + throw new IllegalArgumentException(s"Provided feature update can not be meant to delete the feature: $update") + } Review comment: nit: new line ########## File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json ########## @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 50, + "type": "request", + "name": "UpdateFeaturesRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+", + "about": "The list of updates to finalized features.", "fields": [ + {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true, + "about": "The name of the finalized feature to be updated."}, + {"name": "MaxVersionLevel", "type": "int16", "versions": "0+", + "about": "The new maximum version level for the finalized feature. A value >= 1 is valid. A value < 1, is special, and can be used to request the deletion of the finalized feature."}, + {"name": "AllowDowngrade", "type": "bool", "versions": "0+", + "about": "When set to true, the finalized feature version level is allowed to be downgraded/deleted."} Review comment: Should we also mention that this flag would fail the request when we are not actually doing a downgrade? ########## File path: clients/src/main/java/org/apache/kafka/common/errors/FeatureUpdateFailedException.java ########## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class FeatureUpdateFailedException extends ApiException { Review comment: Do we need to make this a public error? It seems only be used internally, so could be made private if we don't have intention to let user catch. ########## File path: core/src/main/scala/kafka/controller/KafkaController.scala ########## @@ -266,6 +275,178 @@ class KafkaController(val config: KafkaConfig, } } + private def createFeatureZNode(newNode: FeatureZNode): Int = { + info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode") + zkClient.createFeatureZNode(newNode) + val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path) + newVersion + } + + private def updateFeatureZNode(updatedNode: FeatureZNode): Int = { + info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode") + zkClient.updateFeatureZNode(updatedNode) + } + + /** + * This method enables the feature versioning system (KIP-584). + * + * Development in Kafka (from a high level) is organized into features. Each feature is tracked by + * a name and a range of version numbers. A feature can be of two types: + * + * 1. Supported feature: + * A supported feature is represented by a name (String) and a range of versions (defined by a + * {@link SupportedVersionRange}). It refers to a feature that a particular broker advertises + * support for. Each broker advertises the version ranges of its own supported features in its + * own BrokerIdZNode. The contents of the advertisement are specific to the particular broker and + * do not represent any guarantee of a cluster-wide availability of the feature for any particular + * range of versions. + * + * 2. Finalized feature: + * A finalized feature is represented by a name (String) and a range of version levels (defined + * by a {@link FinalizedVersionRange}). Whenever the feature versioning system (KIP-584) is + * enabled, the finalized features are stored in the cluster-wide common FeatureZNode. + * In comparison to a supported feature, the key difference is that a finalized feature exists + * in ZK only when it is guaranteed to be supported by any random broker in the cluster for a + * specified range of version levels. Also, the controller is the only entity modifying the + * information about finalized features. + * + * This method sets up the FeatureZNode with enabled status, which means that the finalized + * features stored in the FeatureZNode are active. The enabled status should be written by the + * controller to the FeatureZNode only when the broker IBP config is greater than or equal to + * KAFKA_2_7_IV0. + * + * There are multiple cases handled here: + * + * 1. New cluster bootstrap: + * A new Kafka cluster (i.e. it is deployed first time) is almost always started with IBP config + * setting greater than or equal to KAFKA_2_7_IV0. We would like to start the cluster with all + * the possible supported features finalized immediately. Assuming this is the case, the + * controller will start up and notice that the FeatureZNode is absent in the new cluster, + * it will then create a FeatureZNode (with enabled status) containing the entire list of + * default supported features as its finalized features. + * + * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0: + * Imagine there is an existing Kafka cluster with IBP config less than KAFKA_2_7_IV0, and the + * broker binary has been upgraded to a newer version that supports the feature versioning + * system (KIP-584). This means the user is upgrading from an earlier version of the broker + * binary. In this case, we want to start with no finalized features and allow the user to + * finalize them whenever they are ready i.e. in the future whenever the user sets IBP config + * to be greater than or equal to KAFKA_2_7_IV0, then the user could start finalizing the + * features. This process ensures we do not enable all the possible features immediately after + * an upgrade, which could be harmful to Kafka. + * This is how we handle such a case: + * - Before the IBP config upgrade (i.e. IBP config set to less than KAFKA_2_7_IV0), the + * controller will start up and check if the FeatureZNode is absent. If absent, it will + * react by creating a FeatureZNode with disabled status and empty finalized features. + * Otherwise, if a node already exists in enabled status then the controller will just + * flip the status to disabled and clear the finalized features. + * - After the IBP config upgrade (i.e. IBP config set to greater than or equal to + * KAFKA_2_7_IV0), when the controller starts up it will check if the FeatureZNode exists + * and whether it is disabled. In such a case, it won’t upgrade all features immediately. + * Instead it will just switch the FeatureZNode status to enabled status. This lets the + * user finalize the features later. + * + * 3. Broker binary upgraded, with existing cluster IBP config >= KAFKA_2_7_IV0: + * Imagine an existing Kafka cluster with IBP config >= KAFKA_2_7_IV0, and the broker binary + * has just been upgraded to a newer version (that supports IBP config KAFKA_2_7_IV0 and higher). + * The controller will start up and find that a FeatureZNode is already present with enabled + * status and existing finalized features. In such a case, the controller needs to scan the + * existing finalized features and mutate them for the purpose of version level deprecation + * (if needed). + * This is how we handle this case: If an existing finalized feature is present in the default + * finalized features, then, its existing minimum version level is updated to the default + * minimum version level maintained in the BrokerFeatures object. The goal of this mutation is + * to permanently deprecate one or more feature version levels. The range of feature version + * levels deprecated are from the closed range: [existing_min_version_level, default_min_version_level]. + * NOTE: Deprecating a feature version level is an incompatible change, which requires a major + * release of Kafka. In such a release, the minimum version level maintained within the + * BrokerFeatures class is updated suitably to record the deprecation of the feature. + * + * 4. Broker downgrade: + * Imagine that a Kafka cluster exists already and the IBP config is greater than or equal to + * KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by setting IBP config to a + * value less than KAFKA_2_7_IV0. This means the user is also disabling the feature versioning + * system (KIP-584). In this case, when the controller starts up with the lower IBP config, it + * will switch the FeatureZNode status to disabled with empty features. + */ + private def enableFeatureVersioning(): Unit = { + val defaultFinalizedFeatures = brokerFeatures.getDefaultFinalizedFeatures + val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(FeatureZNode.path) + if (version == ZkVersion.UnknownVersion) { + val newVersion = createFeatureZNode(new FeatureZNode(FeatureZNodeStatus.Enabled, defaultFinalizedFeatures)) + featureCache.waitUntilEpochOrThrow(newVersion, config.zkConnectionTimeoutMs) + } else { + val existingFeatureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + var newFeatures: Features[FinalizedVersionRange] = Features.emptyFinalizedFeatures() + if (existingFeatureZNode.status.equals(FeatureZNodeStatus.Enabled)) { + newFeatures = Features.finalizedFeatures(existingFeatureZNode.features.features().asScala.map { + case (featureName, existingVersionRange) => + val brokerDefaultVersionRange = defaultFinalizedFeatures.get(featureName) + if (brokerDefaultVersionRange == null) { + warn(s"Existing finalized feature: $featureName with $existingVersionRange" + + s" is absent in default finalized $defaultFinalizedFeatures") + (featureName, existingVersionRange) + } else if (brokerDefaultVersionRange.max() >= existingVersionRange.max() && + brokerDefaultVersionRange.min() <= existingVersionRange.max()) { + // Through this change, we deprecate all version levels in the closed range: + // [existingVersionRange.min(), brokerDefaultVersionRange.min() - 1] + (featureName, new FinalizedVersionRange(brokerDefaultVersionRange.min(), existingVersionRange.max())) + } else { + // If the existing version levels fall completely outside the + // range of the default finalized version levels (i.e. no intersection), or, if the + // existing version levels are ineligible for a modification since they are + // incompatible with default finalized version levels, then we skip the update. + warn(s"Can not update minimum version level in finalized feature: $featureName," + + s" since the existing $existingVersionRange is not eligible for a change" + + s" based on the default $brokerDefaultVersionRange.") + (featureName, existingVersionRange) + } + }.asJava) + } + val newFeatureZNode = new FeatureZNode(FeatureZNodeStatus.Enabled, newFeatures) + if (!newFeatureZNode.equals(existingFeatureZNode)) { + val newVersion = updateFeatureZNode(newFeatureZNode) + featureCache.waitUntilEpochOrThrow(newVersion, config.zkConnectionTimeoutMs) + } + } + } + + /** + * Disables the feature versioning system (KIP-584). + * + * Sets up the FeatureZNode with disabled status. This status means the feature versioning system + * (KIP-584) is disabled, and, the finalized features stored in the FeatureZNode are not relevant. + * This status should be written by the controller to the FeatureZNode only when the broker + * IBP config is less than KAFKA_2_7_IV0. + * + * NOTE: + * 1. When this method returns, existing finalized features (if any) will be cleared from the + * FeatureZNode. + * 2. This method, unlike enableFeatureVersioning() need not wait for the FinalizedFeatureCache + * to be updated, because, such updates to the cache (via FinalizedFeatureChangeListener) + * are disabled when IBP config is < than KAFKA_2_7_IV0. + */ + private def disableFeatureVersioning(): Unit = { + val newNode = FeatureZNode(FeatureZNodeStatus.Disabled, Features.emptyFinalizedFeatures()) + val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(FeatureZNode.path) + if (version == ZkVersion.UnknownVersion) { + createFeatureZNode(newNode) Review comment: Do we need to call `featureCache.waitUntilEpochOrThrow(newNode, config.zkConnectionTimeoutMs)` here to ensure the update is successful? ########## File path: core/src/main/scala/kafka/server/BrokerFeatures.scala ########## @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange} +import org.apache.kafka.common.feature.Features._ + +import scala.jdk.CollectionConverters._ + +/** + * A class that encapsulates the following: + * + * 1. The latest features supported by the Broker. + * + * 2. The default minimum version levels for specific features. This map enables feature + * version level deprecation. This is how it works: in order to deprecate feature version levels, + * in this map the default minimum version level of a feature can be set to a new value that's + * higher than 1 (let's call this latest_min_version_level). In doing so, the feature version levels + * in the closed range: [1, latest_min_version_level - 1] get deprecated by the controller logic + * that applies this map to persistent finalized feature state in ZK (this mutation happens + * during controller election and during finalized feature updates via the + * ApiKeys.UPDATE_FINALIZED_FEATURES api). This will automatically mean external clients of Kafka + * would need to stop using the finalized min version levels that have been deprecated. + * + * This class also provides APIs to check for incompatibilities between the features supported by + * the Broker and finalized features. This class is immutable in production. It provides few APIs to + * mutate state only for the purpose of testing. + */ +class BrokerFeatures private (@volatile var supportedFeatures: Features[SupportedVersionRange], + @volatile var defaultFeatureMinVersionLevels: Map[String, Short]) { + require(BrokerFeatures.areFeatureMinVersionLevelsCompatible( + supportedFeatures, defaultFeatureMinVersionLevels)) + + // For testing only. + def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = { + require( + BrokerFeatures.areFeatureMinVersionLevelsCompatible(newFeatures, defaultFeatureMinVersionLevels)) + supportedFeatures = newFeatures + } + + /** + * Returns the default minimum version level for a specific feature. + * + * @param feature the name of the feature + * + * @return the default minimum version level for the feature if its defined. + * otherwise, returns 1. + */ + def defaultMinVersionLevel(feature: String): Short = { + defaultFeatureMinVersionLevels.getOrElse(feature, 1) + } + + // For testing only. + def setDefaultMinVersionLevels(newMinVersionLevels: Map[String, Short]): Unit = { + require( + BrokerFeatures.areFeatureMinVersionLevelsCompatible(supportedFeatures, newMinVersionLevels)) + defaultFeatureMinVersionLevels = newMinVersionLevels + } + + /** + * Returns the default finalized features that a new Kafka cluster with IBP config >= KAFKA_2_7_IV0 + * needs to be bootstrapped with. + */ + def getDefaultFinalizedFeatures: Features[FinalizedVersionRange] = { + Features.finalizedFeatures( + supportedFeatures.features.asScala.map { + case(name, versionRange) => ( + name, new FinalizedVersionRange(defaultMinVersionLevel(name), versionRange.max)) + }.asJava) + } + + /** + * Returns the set of feature names found to be incompatible. + * A feature incompatibility is a version mismatch between the latest feature supported by the + * Broker, and the provided finalized feature. This can happen because a provided finalized + * feature: + * 1) Does not exist in the Broker (i.e. it is unknown to the Broker). + * [OR] + * 2) Exists but the FinalizedVersionRange does not match with the + * supported feature's SupportedVersionRange. + * + * @param finalized The finalized features against which incompatibilities need to be checked for. + * + * @return The subset of input features which are incompatible. If the returned object + * is empty, it means there were no feature incompatibilities found. + */ + def incompatibleFeatures(finalized: Features[FinalizedVersionRange]): Features[FinalizedVersionRange] = { + BrokerFeatures.incompatibleFeatures(supportedFeatures, finalized, logIncompatibilities = true) + } +} + +object BrokerFeatures extends Logging { + + def createDefault(): BrokerFeatures = { + // The arguments are currently empty, but, in the future as we define features we should + // populate the required values here. + new BrokerFeatures(emptySupportedFeatures, Map[String, Short]()) + } + + /** + * Returns true if any of the provided finalized features are incompatible with the provided + * supported features. + * + * @param supportedFeatures The supported features to be compared + * @param finalizedFeatures The finalized features to be compared + * + * @return - True if there are any feature incompatibilities found. + * - False otherwise. + */ + def hasIncompatibleFeatures(supportedFeatures: Features[SupportedVersionRange], + finalizedFeatures: Features[FinalizedVersionRange]): Boolean = { + !incompatibleFeatures(supportedFeatures, finalizedFeatures, false).empty + } + + private def incompatibleFeatures(supportedFeatures: Features[SupportedVersionRange], + finalizedFeatures: Features[FinalizedVersionRange], + logIncompatibilities: Boolean): Features[FinalizedVersionRange] = { + val incompatibleFeaturesInfo = finalizedFeatures.features.asScala.map { + case (feature, versionLevels) => + val supportedVersions = supportedFeatures.get(feature) + if (supportedVersions == null) { + (feature, versionLevels, "{feature=%s, reason='Unsupported feature'}".format(feature)) + } else if (versionLevels.isIncompatibleWith(supportedVersions)) { + (feature, versionLevels, "{feature=%s, reason='%s is incompatible with %s'}".format( + feature, versionLevels, supportedVersions)) + } else { + (feature, versionLevels, null) + } + }.filter{ case(_, _, errorReason) => errorReason != null}.toList + + if (logIncompatibilities && incompatibleFeaturesInfo.nonEmpty) { + warn( + "Feature incompatibilities seen: " + incompatibleFeaturesInfo.map { + case(_, _, errorReason) => errorReason }) + } + Features.finalizedFeatures(incompatibleFeaturesInfo.map { + case(feature, versionLevels, _) => (feature, versionLevels) }.toMap.asJava) + } + + /** + * A check that ensures each feature defined with min version level is a supported feature, and + * the min version level value is valid (i.e. it is compatible with the supported version range). + * + * @param supportedFeatures the supported features + * @param featureMinVersionLevels the feature minimum version levels + * + * @return - true, if the above described check passes. + * - false, otherwise. + */ + private def areFeatureMinVersionLevelsCompatible( + supportedFeatures: Features[SupportedVersionRange], + featureMinVersionLevels: Map[String, Short] + ): Boolean = { + featureMinVersionLevels.forall { + case(featureName, minVersionLevel) => + val supportedFeature = supportedFeatures.get(featureName) + (supportedFeature != null) && + !new FinalizedVersionRange(minVersionLevel, supportedFeature.max()) Review comment: Could we get a static method instead of initiating a new `FinalizedVersionRange` for a comparison every time? ########## File path: core/src/main/scala/kafka/controller/KafkaController.scala ########## @@ -1647,6 +1844,188 @@ class KafkaController(val config: KafkaConfig, } } + /** + * Returns the new FinalizedVersionRange for the feature, if there are no feature + * incompatibilities seen with all known brokers for the provided feature update. + * Otherwise returns a suitable error. Review comment: State the error explicitly here. ########## File path: core/src/main/scala/kafka/controller/KafkaController.scala ########## @@ -1647,6 +1844,188 @@ class KafkaController(val config: KafkaConfig, } } + /** + * Returns the new FinalizedVersionRange for the feature, if there are no feature + * incompatibilities seen with all known brokers for the provided feature update. + * Otherwise returns a suitable error. + * + * @param update the feature update to be processed (this can not be meant to delete the feature) + * + * @return the new FinalizedVersionRange or error, as described above. + */ + private def newFinalizedVersionRangeOrIncompatibilityError(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, ApiError] = { + if (UpdateFeaturesRequest.isDeleteRequest(update)) { + throw new IllegalArgumentException(s"Provided feature update can not be meant to delete the feature: $update") + } + // NOTE: Below we set the finalized min version level to be the default minimum version + // level. If the finalized feature already exists, then, this can cause deprecation of all + // version levels in the closed range: + // [existingVersionRange.min(), defaultMinVersionLevel - 1]. + val defaultMinVersionLevel = brokerFeatures.defaultMinVersionLevel(update.feature) + val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, update.maxVersionLevel) + val numIncompatibleBrokers = controllerContext.liveOrShuttingDownBrokers.count(broker => { + val singleFinalizedFeature = + Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, newVersionRange))) + BrokerFeatures.hasIncompatibleFeatures(broker.features, singleFinalizedFeature) + }) + if (numIncompatibleBrokers == 0) { + Left(newVersionRange) + } else { + Right( + new ApiError(Errors.INVALID_REQUEST, + s"Could not apply finalized feature update because $numIncompatibleBrokers" + + " brokers were found to have incompatible features.")) + } + } + + /** + * Validate and process a finalized feature update. + * + * If the processing is successful, then, the return value contains: + * 1. the new FinalizedVersionRange for the feature, if the feature update was not meant to delete the feature. + * 2. Option.empty, if the feature update was meant to delete the feature. + * + * If the processing failed, then returned value contains a suitable ApiError. + * + * @param update the feature update to be processed. + * + * @return the new FinalizedVersionRange or error, as described above. + */ + private def processFeatureUpdate(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[Option[FinalizedVersionRange], ApiError] = { + val existingFeatures = featureCache.get + .map(featuresAndEpoch => featuresAndEpoch.features.features().asScala) + .getOrElse(Map[String, FinalizedVersionRange]()) + + def newVersionRangeOrError(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[Option[FinalizedVersionRange], ApiError] = { + newFinalizedVersionRangeOrIncompatibilityError(update) + .fold(versionRange => Left(Some(versionRange)), error => Right(error)) + } + + if (update.feature.isEmpty) { + // Check that the feature name is not empty. + Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be empty.")) + } else { + val cacheEntry = existingFeatures.get(update.feature).orNull + + // We handle deletion requests separately from non-deletion requests. + if (UpdateFeaturesRequest.isDeleteRequest(update)) { + if (cacheEntry == null) { + // Disallow deletion of a non-existing finalized feature. + Right(new ApiError(Errors.INVALID_REQUEST, + s"Can not delete non-existing finalized feature: '${update.feature}'")) + } else { + Left(Option.empty) + } + } else if (update.maxVersionLevel() < 1) { + // Disallow deletion of a finalized feature without allowDowngrade flag set. + Right(new ApiError(Errors.INVALID_REQUEST, + s"Can not provide maxVersionLevel: ${update.maxVersionLevel} less" + + s" than 1 for feature: '${update.feature}' without setting the" + + " allowDowngrade flag to true in the request.")) + } else { + if (cacheEntry == null) { + newVersionRangeOrError(update) + } else { + if (update.maxVersionLevel == cacheEntry.max()) { + // Disallow a case where target maxVersionLevel matches existing maxVersionLevel. + Right(new ApiError(Errors.INVALID_REQUEST, + s"Can not ${if (update.allowDowngrade) "downgrade" else "upgrade"}" + + s" a finalized feature: '${update.feature}' from existing" + + s" maxVersionLevel:${cacheEntry.max} to the same value.")) + } else if (update.maxVersionLevel < cacheEntry.max && !update.allowDowngrade) { + // Disallow downgrade of a finalized feature without the allowDowngrade flag set. + Right(new ApiError(Errors.INVALID_REQUEST, + s"Can not downgrade finalized feature: '${update.feature}' from" + + s" existing maxVersionLevel:${cacheEntry.max} to provided" + + s" maxVersionLevel:${update.maxVersionLevel} without setting the" + + " allowDowngrade flag in the request.")) + } else if (update.allowDowngrade && update.maxVersionLevel > cacheEntry.max) { Review comment: I'm actually wondering whether this is too strict in the perspective of a user. If they accidentally set a feature version larger than the cache, what they only care about is to be able to change the version to it. So it's a matter of whether we think this is a user error, or this could happen when user gets stale feature information from a broker while the downgrade already succeed eventually. If we want to keep this check, it makes sense to update the meta comments around `allowDowngrade` to inform user that the request could fail when the target version is actually higher than the current finalized feature. ########## File path: core/src/main/scala/kafka/controller/KafkaController.scala ########## @@ -1647,6 +1844,185 @@ class KafkaController(val config: KafkaConfig, } } + /** + * Returns the new FinalizedVersionRange for the feature, if there are no feature + * incompatibilities seen with all known brokers for the provided feature update. + * Otherwise returns a suitable error. + * + * @param update the feature update to be processed (this can not be meant to delete the feature) + * + * @return the new FinalizedVersionRange or error, as described above. + */ + private def newFinalizedVersionRangeOrIncompatibilityError(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, ApiError] = { + if (UpdateFeaturesRequest.isDeleteRequest(update)) { + throw new IllegalArgumentException(s"Provided feature update can not be meant to delete the feature: $update") + } + // NOTE: Below we set the finalized min version level to be the default minimum version + // level. If the finalized feature already exists, then, this can cause deprecation of all + // version levels in the closed range: + // [existingVersionRange.min(), defaultMinVersionLevel - 1]. + val defaultMinVersionLevel = brokerFeatures.defaultMinVersionLevel(update.feature) + val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, update.maxVersionLevel) + val numIncompatibleBrokers = controllerContext.liveOrShuttingDownBrokers.count(broker => { + val singleFinalizedFeature = + Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, newVersionRange))) + BrokerFeatures.hasIncompatibleFeatures(broker.features, singleFinalizedFeature) + }) + if (numIncompatibleBrokers == 0) { + Left(newVersionRange) + } else { + Right( + new ApiError(Errors.INVALID_REQUEST, + s"Could not apply finalized feature update because $numIncompatibleBrokers" + + " brokers were found to have incompatible features.")) + } + } + + /** + * Validate and process a finalized feature update on an existing FinalizedVersionRange for the + * feature. + * + * If the processing is successful, then, the return value contains: + * 1. the new FinalizedVersionRange for the feature, if the feature update was not meant to delete the feature. + * 2. Option.empty, if the feature update was meant to delete the feature. + * + * If the processing failed, then returned value contains a suitable ApiError. + * + * @param update the feature update to be processed. + * @param existingVersionRange the existing FinalizedVersionRange which can be empty when no + * FinalizedVersionRange exists for the associated feature + * + * @return the new FinalizedVersionRange or error, as described above. + */ + private def processFeatureUpdate(update: UpdateFeaturesRequestData.FeatureUpdateKey, + existingVersionRange: Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] = { + def newVersionRangeOrError(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[Option[FinalizedVersionRange], ApiError] = { + newFinalizedVersionRangeOrIncompatibilityError(update) + .fold(versionRange => Left(Some(versionRange)), error => Right(error)) + } + + if (update.feature.isEmpty) { + // Check that the feature name is not empty. + Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be empty.")) + } else { + // We handle deletion requests separately from non-deletion requests. + if (UpdateFeaturesRequest.isDeleteRequest(update)) { + if (existingVersionRange.isEmpty) { + // Disallow deletion of a non-existing finalized feature. + Right(new ApiError(Errors.INVALID_REQUEST, + s"Can not delete non-existing finalized feature: '${update.feature}'")) + } else { + Left(Option.empty) + } + } else if (update.maxVersionLevel() < 1) { Review comment: Is this case covered by the case on L1931? Could we merge both? ########## File path: core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala ########## @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange} +import org.junit.Assert.{assertEquals, assertThrows, assertTrue} +import org.junit.Test + +import scala.jdk.CollectionConverters._ + +class BrokerFeaturesTest { Review comment: Some methods in the `BrokerFeatures` are not covered by this suite, such as `defaultMinVersionLevel`, `getDefaultFinalizedFeatures` and `hasIncompatibleFeatures`, you could use code coverage tool to figure out any missing part. ########## File path: core/src/main/scala/kafka/controller/KafkaController.scala ########## @@ -1395,7 +1596,7 @@ class KafkaController(val config: KafkaConfig, if (newMetadataOpt.nonEmpty && oldMetadataOpt.nonEmpty) { val oldMetadata = oldMetadataOpt.get val newMetadata = newMetadataOpt.get - if (newMetadata.endPoints != oldMetadata.endPoints) { + if (newMetadata.endPoints != oldMetadata.endPoints || !oldMetadata.features.equals(newMetadata.features)) { Review comment: I see, still I'm a bit worried future changes could break this assumption. Not a bad idea to check `features != null`? ########## File path: core/src/main/scala/kafka/controller/KafkaController.scala ########## @@ -1647,6 +1844,185 @@ class KafkaController(val config: KafkaConfig, } } + /** + * Returns the new FinalizedVersionRange for the feature, if there are no feature + * incompatibilities seen with all known brokers for the provided feature update. + * Otherwise returns a suitable error. + * + * @param update the feature update to be processed (this can not be meant to delete the feature) + * + * @return the new FinalizedVersionRange or error, as described above. + */ + private def newFinalizedVersionRangeOrIncompatibilityError(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, ApiError] = { + if (UpdateFeaturesRequest.isDeleteRequest(update)) { + throw new IllegalArgumentException(s"Provided feature update can not be meant to delete the feature: $update") + } + // NOTE: Below we set the finalized min version level to be the default minimum version + // level. If the finalized feature already exists, then, this can cause deprecation of all + // version levels in the closed range: + // [existingVersionRange.min(), defaultMinVersionLevel - 1]. + val defaultMinVersionLevel = brokerFeatures.defaultMinVersionLevel(update.feature) + val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, update.maxVersionLevel) + val numIncompatibleBrokers = controllerContext.liveOrShuttingDownBrokers.count(broker => { + val singleFinalizedFeature = + Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, newVersionRange))) + BrokerFeatures.hasIncompatibleFeatures(broker.features, singleFinalizedFeature) + }) + if (numIncompatibleBrokers == 0) { + Left(newVersionRange) + } else { + Right( + new ApiError(Errors.INVALID_REQUEST, + s"Could not apply finalized feature update because $numIncompatibleBrokers" + + " brokers were found to have incompatible features.")) + } + } + + /** + * Validate and process a finalized feature update on an existing FinalizedVersionRange for the + * feature. + * + * If the processing is successful, then, the return value contains: + * 1. the new FinalizedVersionRange for the feature, if the feature update was not meant to delete the feature. + * 2. Option.empty, if the feature update was meant to delete the feature. + * + * If the processing failed, then returned value contains a suitable ApiError. + * + * @param update the feature update to be processed. + * @param existingVersionRange the existing FinalizedVersionRange which can be empty when no + * FinalizedVersionRange exists for the associated feature + * + * @return the new FinalizedVersionRange or error, as described above. + */ + private def processFeatureUpdate(update: UpdateFeaturesRequestData.FeatureUpdateKey, + existingVersionRange: Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] = { + def newVersionRangeOrError(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[Option[FinalizedVersionRange], ApiError] = { + newFinalizedVersionRangeOrIncompatibilityError(update) + .fold(versionRange => Left(Some(versionRange)), error => Right(error)) + } + + if (update.feature.isEmpty) { + // Check that the feature name is not empty. + Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be empty.")) + } else { + // We handle deletion requests separately from non-deletion requests. + if (UpdateFeaturesRequest.isDeleteRequest(update)) { + if (existingVersionRange.isEmpty) { + // Disallow deletion of a non-existing finalized feature. + Right(new ApiError(Errors.INVALID_REQUEST, + s"Can not delete non-existing finalized feature: '${update.feature}'")) + } else { + Left(Option.empty) + } + } else if (update.maxVersionLevel() < 1) { + // Disallow deletion of a finalized feature without allowDowngrade flag set. + Right(new ApiError(Errors.INVALID_REQUEST, + s"Can not provide maxVersionLevel: ${update.maxVersionLevel} less" + + s" than 1 for feature: '${update.feature}' without setting the" + + " allowDowngrade flag to true in the request.")) + } else { + existingVersionRange.map(existing => + if (update.maxVersionLevel == existing.max) { + // Disallow a case where target maxVersionLevel matches existing maxVersionLevel. + Right(new ApiError(Errors.INVALID_REQUEST, + s"Can not ${if (update.allowDowngrade) "downgrade" else "upgrade"}" + + s" a finalized feature: '${update.feature}' from existing" + + s" maxVersionLevel:${existing.max} to the same value.")) + } else if (update.maxVersionLevel < existing.max && !update.allowDowngrade) { + // Disallow downgrade of a finalized feature without the allowDowngrade flag set. + Right(new ApiError(Errors.INVALID_REQUEST, + s"Can not downgrade finalized feature: '${update.feature}' from" + + s" existing maxVersionLevel:${existing.max} to provided" + + s" maxVersionLevel:${update.maxVersionLevel} without setting the" + + " allowDowngrade flag in the request.")) + } else if (update.allowDowngrade && update.maxVersionLevel > existing.max) { + // Disallow a request that sets allowDowngrade flag without specifying a + // maxVersionLevel that's lower than the existing maxVersionLevel. + Right(new ApiError(Errors.INVALID_REQUEST, + s"When finalized feature: '${update.feature}' has the allowDowngrade" + + " flag set in the request, the provided" + + s" maxVersionLevel:${update.maxVersionLevel} can not be greater than" + + s" existing maxVersionLevel:${existing.max}.")) + } else if (update.maxVersionLevel() < existing.min) { + // Disallow downgrade of a finalized feature below the existing finalized + // minVersionLevel. + Right(new ApiError(Errors.INVALID_REQUEST, + s"Can not downgrade finalized feature: '${update.feature}' to" + + s" maxVersionLevel:${update.maxVersionLevel} because it's lower than" + + s" the existing minVersionLevel:${existing.min}.")) + } else { + newVersionRangeOrError(update) + } + ).getOrElse(newVersionRangeOrError(update)) + } + } + } + + private def processFeatureUpdates(request: UpdateFeaturesRequest, + callback: UpdateFeaturesCallback): Unit = { + if (isActive) { + processFeatureUpdatesWithActiveController(request, callback) + } else { + val results = request.data().featureUpdates().asScala.map { + update => update.feature() -> new ApiError(Errors.NOT_CONTROLLER) + }.toMap + callback(results) + } + } + + private def processFeatureUpdatesWithActiveController(request: UpdateFeaturesRequest, + callback: UpdateFeaturesCallback): Unit = { + val updates = request.data.featureUpdates + val existingFeatures = featureCache.get + .map(featuresAndEpoch => featuresAndEpoch.features.features().asScala) + .getOrElse(Map[String, FinalizedVersionRange]()) + // Map of feature to FinalizedVersionRange. This contains the target features to be eventually + // written to FeatureZNode. + val targetFeatures = scala.collection.mutable.Map[String, FinalizedVersionRange]() ++ existingFeatures + // Map of feature to error. + var errors = scala.collection.mutable.Map[String, ApiError]() + + // Process each FeatureUpdate. + // If a FeatureUpdate is found to be valid, then the corresponding entry in errors would contain + // Errors.NONE. Otherwise the entry would contain the appropriate error. + updates.asScala.iterator.foreach { update => + processFeatureUpdate(update, existingFeatures.get(update.feature())) match { + case Left(newVersionRangeOrNone) => + newVersionRangeOrNone + .map(newVersionRange => targetFeatures += (update.feature() -> newVersionRange)) + .getOrElse(targetFeatures -= update.feature()) + errors += (update.feature() -> new ApiError(Errors.NONE)) + case Right(featureUpdateFailureReason) => + errors += (update.feature() -> featureUpdateFailureReason) + } + } + + if (existingFeatures.equals(targetFeatures)) { Review comment: Could you clarify the reasoning here? If structs are not the same, are we going to do a partial update? ########## File path: core/src/main/scala/kafka/controller/KafkaController.scala ########## @@ -1647,6 +1844,185 @@ class KafkaController(val config: KafkaConfig, } } + /** + * Returns the new FinalizedVersionRange for the feature, if there are no feature + * incompatibilities seen with all known brokers for the provided feature update. + * Otherwise returns a suitable error. + * + * @param update the feature update to be processed (this can not be meant to delete the feature) + * + * @return the new FinalizedVersionRange or error, as described above. + */ + private def newFinalizedVersionRangeOrIncompatibilityError(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, ApiError] = { + if (UpdateFeaturesRequest.isDeleteRequest(update)) { + throw new IllegalArgumentException(s"Provided feature update can not be meant to delete the feature: $update") + } + // NOTE: Below we set the finalized min version level to be the default minimum version + // level. If the finalized feature already exists, then, this can cause deprecation of all + // version levels in the closed range: + // [existingVersionRange.min(), defaultMinVersionLevel - 1]. + val defaultMinVersionLevel = brokerFeatures.defaultMinVersionLevel(update.feature) + val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, update.maxVersionLevel) + val numIncompatibleBrokers = controllerContext.liveOrShuttingDownBrokers.count(broker => { + val singleFinalizedFeature = + Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, newVersionRange))) + BrokerFeatures.hasIncompatibleFeatures(broker.features, singleFinalizedFeature) + }) + if (numIncompatibleBrokers == 0) { + Left(newVersionRange) + } else { + Right( + new ApiError(Errors.INVALID_REQUEST, + s"Could not apply finalized feature update because $numIncompatibleBrokers" + + " brokers were found to have incompatible features.")) + } + } + + /** + * Validate and process a finalized feature update on an existing FinalizedVersionRange for the + * feature. + * + * If the processing is successful, then, the return value contains: + * 1. the new FinalizedVersionRange for the feature, if the feature update was not meant to delete the feature. + * 2. Option.empty, if the feature update was meant to delete the feature. + * + * If the processing failed, then returned value contains a suitable ApiError. + * + * @param update the feature update to be processed. + * @param existingVersionRange the existing FinalizedVersionRange which can be empty when no + * FinalizedVersionRange exists for the associated feature + * + * @return the new FinalizedVersionRange or error, as described above. + */ + private def processFeatureUpdate(update: UpdateFeaturesRequestData.FeatureUpdateKey, + existingVersionRange: Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] = { + def newVersionRangeOrError(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[Option[FinalizedVersionRange], ApiError] = { + newFinalizedVersionRangeOrIncompatibilityError(update) + .fold(versionRange => Left(Some(versionRange)), error => Right(error)) + } + + if (update.feature.isEmpty) { + // Check that the feature name is not empty. + Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be empty.")) + } else { + // We handle deletion requests separately from non-deletion requests. + if (UpdateFeaturesRequest.isDeleteRequest(update)) { + if (existingVersionRange.isEmpty) { + // Disallow deletion of a non-existing finalized feature. + Right(new ApiError(Errors.INVALID_REQUEST, + s"Can not delete non-existing finalized feature: '${update.feature}'")) + } else { + Left(Option.empty) + } + } else if (update.maxVersionLevel() < 1) { + // Disallow deletion of a finalized feature without allowDowngrade flag set. + Right(new ApiError(Errors.INVALID_REQUEST, + s"Can not provide maxVersionLevel: ${update.maxVersionLevel} less" + + s" than 1 for feature: '${update.feature}' without setting the" + + " allowDowngrade flag to true in the request.")) + } else { + existingVersionRange.map(existing => + if (update.maxVersionLevel == existing.max) { + // Disallow a case where target maxVersionLevel matches existing maxVersionLevel. + Right(new ApiError(Errors.INVALID_REQUEST, + s"Can not ${if (update.allowDowngrade) "downgrade" else "upgrade"}" + + s" a finalized feature: '${update.feature}' from existing" + + s" maxVersionLevel:${existing.max} to the same value.")) + } else if (update.maxVersionLevel < existing.max && !update.allowDowngrade) { + // Disallow downgrade of a finalized feature without the allowDowngrade flag set. + Right(new ApiError(Errors.INVALID_REQUEST, + s"Can not downgrade finalized feature: '${update.feature}' from" + + s" existing maxVersionLevel:${existing.max} to provided" + + s" maxVersionLevel:${update.maxVersionLevel} without setting the" + + " allowDowngrade flag in the request.")) + } else if (update.allowDowngrade && update.maxVersionLevel > existing.max) { + // Disallow a request that sets allowDowngrade flag without specifying a + // maxVersionLevel that's lower than the existing maxVersionLevel. + Right(new ApiError(Errors.INVALID_REQUEST, + s"When finalized feature: '${update.feature}' has the allowDowngrade" + + " flag set in the request, the provided" + + s" maxVersionLevel:${update.maxVersionLevel} can not be greater than" + + s" existing maxVersionLevel:${existing.max}.")) + } else if (update.maxVersionLevel() < existing.min) { Review comment: We should be consistent and remove `()` from `maxVersionLevel` ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala ########## @@ -39,7 +42,7 @@ case class FinalizedFeaturesAndEpoch(features: Features[FinalizedVersionRange], * * @see FinalizedFeatureChangeListener */ -object FinalizedFeatureCache extends Logging { +class FinalizedFeatureCache(private val brokerFeatures: BrokerFeatures) extends Logging { Review comment: The meta comment for `FinalizedFeatureCache` should be updated as it is now being accessed for both read and write ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -2945,6 +2948,130 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleUpdateFeatures(request: RequestChannel.Request): Unit = { Review comment: Seems not covered yet ########## File path: core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala ########## @@ -78,25 +76,42 @@ class FinalizedFeatureChangeListenerTest extends ZooKeeperTestHarness { /** * Tests that the listener can be initialized, and that it can listen to ZK notifications * successfully from an "Enabled" FeatureZNode (the ZK data has no feature incompatibilities). + * Particularly the test checks if multiple notifications can be processed in ZK + * (i.e. whether the FeatureZNode watch can be re-established). */ @Test def testInitSuccessAndNotificationSuccess(): Unit = { - createSupportedFeatures() val initialFinalizedFeatures = createFinalizedFeatures() - val listener = createListener(Some(initialFinalizedFeatures)) + val brokerFeatures = createBrokerFeatures() + val cache = new FinalizedFeatureCache(brokerFeatures) + val listener = createListener(cache, Some(initialFinalizedFeatures)) - val updatedFinalizedFeaturesMap = Map[String, FinalizedVersionRange]( - "feature_1" -> new FinalizedVersionRange(2, 4)) - val updatedFinalizedFeatures = Features.finalizedFeatures(updatedFinalizedFeaturesMap.asJava) - zkClient.updateFeatureZNode(FeatureZNode(FeatureZNodeStatus.Enabled, updatedFinalizedFeatures)) - val (mayBeFeatureZNodeNewBytes, updatedVersion) = zkClient.getDataAndVersion(FeatureZNode.path) - assertNotEquals(updatedVersion, ZkVersion.UnknownVersion) - assertFalse(mayBeFeatureZNodeNewBytes.isEmpty) - assertTrue(updatedVersion > initialFinalizedFeatures.epoch) - TestUtils.waitUntilTrue(() => { - FinalizedFeatureCache.get.get.equals(FinalizedFeaturesAndEpoch(updatedFinalizedFeatures, updatedVersion)) - }, "Timed out waiting for FinalizedFeatureCache to be updated with new features") - assertTrue(listener.isListenerInitiated) + def updateAndCheckCache(finalizedFeatures: Features[FinalizedVersionRange]): Unit = { + zkClient.updateFeatureZNode(FeatureZNode(FeatureZNodeStatus.Enabled, finalizedFeatures)) + val (mayBeFeatureZNodeNewBytes, updatedVersion) = zkClient.getDataAndVersion(FeatureZNode.path) + assertNotEquals(updatedVersion, ZkVersion.UnknownVersion) + assertFalse(mayBeFeatureZNodeNewBytes.isEmpty) + assertTrue(updatedVersion > initialFinalizedFeatures.epoch) + + cache.waitUntilEpochOrThrow(updatedVersion, JTestUtils.DEFAULT_MAX_WAIT_MS) + assertEquals(FinalizedFeaturesAndEpoch(finalizedFeatures, updatedVersion), cache.get.get) + assertTrue(listener.isListenerInitiated) + } + + // Check if the write succeeds and a ZK notification is received that causes the feature cache + // to be populated. + updateAndCheckCache( + Features.finalizedFeatures( + Map[String, FinalizedVersionRange]( + "feature_1" -> new FinalizedVersionRange(2, 4)).asJava)) Review comment: Indentation is not right. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -109,7 +109,9 @@ class KafkaApis(val requestChannel: RequestChannel, brokerTopicStats: BrokerTopicStats, val clusterId: String, time: Time, - val tokenManager: DelegationTokenManager) extends Logging { + val tokenManager: DelegationTokenManager, + val brokerFeatures: BrokerFeatures, Review comment: Could we only pass in `featureCache` to reduce the class coupling here? As we already have `brokerFeatures` as a private parameter, it shouldn't be too hard to set a helper to get supported features. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org