abbccdda commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r462453975



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1215,71 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection<ClientQuotaAlterati
      */
     AlterClientQuotasResult 
alterClientQuotas(Collection<ClientQuotaAlteration> entries, 
AlterClientQuotasOptions options);
 
+    /**
+     * Describes finalized as well as supported features. By default, the 
request is issued to any
+     * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+     * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+     * finalized features.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+     * returned {@link DescribeFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the describe operation could 
finish.</li>
+     * </ul>
+     * <p>
+     * @param options   the options to use
+     *
+     * @return          the {@link DescribeFeaturesResult} containing the 
result
+     */
+    DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);

Review comment:
       Note in the post-KIP-500 world, this feature could still work, but the 
request must be redirected to the controller inherently on the broker side, 
instead of sending it directly. So in the comment, we may try to phrase it to 
convey the principal is that `the request must be handled by the controller` 
instead of `the admin client must send this request to the controller`. 

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1215,70 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection<ClientQuotaAlterati
      */
     AlterClientQuotasResult 
alterClientQuotas(Collection<ClientQuotaAlteration> entries, 
AlterClientQuotasOptions options);
 
+    /**
+     * Describes finalized as well as supported features. By default, the 
request is issued to any
+     * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+     * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+     * finalized features.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+     * returned {@link DescribeFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the describe operation could 
finish.</li>
+     * </ul>
+     * <p>
+     * @param options   the options to use
+     *
+     * @return          the {@link DescribeFeaturesResult} containing the 
result
+     */
+    DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+    /**
+     * Applies specified updates to finalized features. This operation is not 
transactional so it
+     * may succeed for some features while fail for others.
+     * <p>
+     * The API takes in a map of finalized feature name to {@link 
FeatureUpdate} that need to be

Review comment:
       nit: s/name/names

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -983,8 +1144,25 @@ class KafkaController(val config: KafkaConfig,
    */
   private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int], 
partitions: Set[TopicPartition]): Unit = {
     try {
+      val filteredBrokers = scala.collection.mutable.Set[Int]() ++ brokers
+      if (config.isFeatureVersioningEnabled) {
+        def hasIncompatibleFeatures(broker: Broker): Boolean = {
+          val latestFinalizedFeatures = featureCache.get
+          if (latestFinalizedFeatures.isDefined) {
+            BrokerFeatures.hasIncompatibleFeatures(broker.features, 
latestFinalizedFeatures.get.features)
+          } else {
+            false
+          }
+        }
+        controllerContext.liveOrShuttingDownBrokers.foreach(broker => {
+          if (filteredBrokers.contains(broker.id) && 
hasIncompatibleFeatures(broker)) {

Review comment:
       I see, what would happen to a currently live broker if it couldn't get 
any metadata update for a while, will it shut down itself?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)}
+ *
+ * The API of this class is evolving. See {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeFeaturesOptions extends 
AbstractOptions<DescribeFeaturesOptions> {
+
+    /**
+     * - True means the {@link 
Admin#describeFeatures(DescribeFeaturesOptions)} request can be

Review comment:
       `can be issued only to the controller.`/ `must be processed by the 
controller`

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -266,6 +275,179 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+    info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+    zkClient.createFeatureZNode(newNode)
+    val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)

Review comment:
       Yea, I mean you could use `val newVersion = 
zkClient.getDataAndVersion(FeatureZNode.path)._2`, but it's up to you.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4052,6 +4058,128 @@ void handleFailure(Throwable throwable) {
         return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
     }
 
+    @Override
+    public DescribeFeaturesResult describeFeatures(final 
DescribeFeaturesOptions options) {
+        final KafkaFutureImpl<FeatureMetadata> future = new 
KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final NodeProvider provider =
+            options.sendRequestToController() ? new ControllerNodeProvider() : 
new LeastLoadedNodeProvider();
+
+        Call call = new Call(
+            "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+            @Override
+            ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+                return new ApiVersionsRequest.Builder();
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final ApiVersionsResponse apiVersionsResponse = 
(ApiVersionsResponse) response;
+                if (apiVersionsResponse.data.errorCode() == 
Errors.NONE.code()) {
+                    future.complete(
+                        new FeatureMetadata(
+                            apiVersionsResponse.finalizedFeatures(),
+                            apiVersionsResponse.finalizedFeaturesEpoch(),
+                            apiVersionsResponse.supportedFeatures()));
+                } else if (options.sendRequestToController() && 
apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) {
+                    handleNotControllerError(Errors.NOT_CONTROLLER);
+                } else {
+                    future.completeExceptionally(
+                        
Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(Collections.singletonList(future), 
throwable);
+            }
+        };
+
+        runnable.call(call, now);
+        return new DescribeFeaturesResult(future);
+    }
+
+    @Override
+    public UpdateFeaturesResult updateFeatures(
+        final Map<String, FeatureUpdate> featureUpdates, final 
UpdateFeaturesOptions options) {
+        if (featureUpdates == null || featureUpdates.isEmpty()) {
+            throw new IllegalArgumentException("Feature updates can not be 
null or empty.");
+        }
+        Objects.requireNonNull(options, "UpdateFeaturesOptions can not be 
null");
+
+        final Map<String, KafkaFutureImpl<Void>> updateFutures = new 
HashMap<>();
+        final UpdateFeaturesRequestData.FeatureUpdateKeyCollection 
featureUpdatesRequestData

Review comment:
       I suggest we build a static method in the `UpdateFeaturesRequest` class 
to avoid exposing the sub modules of feature data, such like:
   ```
   public static UpdateFeaturesRequestData getFeatureRequest(final Map<String, 
FeatureUpdate> featureUpdate);
   ```

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -466,6 +477,42 @@ private static DescribeGroupsResponseData 
prepareDescribeGroupsResponseData(Stri
                 Collections.emptySet()));
         return data;
     }
+
+    private static UpdateFeaturesResponse 
prepareUpdateFeaturesResponse(Map<String, Errors> featureUpdateErrors) {

Review comment:
       Could be moved to the `UpdateFeaturesResponse`

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -82,18 +108,54 @@ object FinalizedFeatureCache extends Logging {
         " The existing cache contents are %s").format(latest, 
oldFeatureAndEpoch)
       throw new FeatureCacheUpdateException(errorMsg)
     } else {
-      val incompatibleFeatures = 
SupportedFeatures.incompatibleFeatures(latest.features)
+      val incompatibleFeatures = 
brokerFeatures.incompatibleFeatures(latest.features)
       if (!incompatibleFeatures.empty) {
         val errorMsg = ("FinalizedFeatureCache update failed since feature 
compatibility" +
           " checks failed! Supported %s has incompatibilities with the latest 
%s."
-          ).format(SupportedFeatures.get, latest)
+          ).format(brokerFeatures.supportedFeatures, latest)
         throw new FeatureCacheUpdateException(errorMsg)
       } else {
-        val logMsg = "Updated cache from existing finalized %s to latest 
finalized %s".format(
+        val logMsg = "Updated cache from existing %s to latest %s".format(
           oldFeatureAndEpoch, latest)
-        featuresAndEpoch = Some(latest)
+        synchronized {
+          featuresAndEpoch = Some(latest)
+          notifyAll()
+        }
         info(logMsg)
       }
     }
   }
+
+  /**
+   * Causes the current thread to wait no more than timeoutMs for the 
specified condition to be met.
+   * It is guaranteed that the provided condition will always be invoked only 
from within a
+   * synchronized block.
+   *
+   * @param waitCondition   the condition to be waited upon:
+   *                         - if the condition returns true, then, the wait 
will stop.
+   *                         - if the condition returns false, it means the 
wait must continue until
+   *                           timeout.
+   *
+   * @param timeoutMs       the timeout (in milli seconds)
+   *
+   * @throws                TimeoutException if the condition is not met 
within timeoutMs.
+   */
+  private def waitUntilConditionOrThrow(waitCondition: () => Boolean, 
timeoutMs: Long): Unit = {
+    if(timeoutMs < 0L) {
+      throw new IllegalArgumentException(s"Expected timeoutMs >= 0, but 
$timeoutMs was provided.")
+    }
+    val waitEndTimeNanos = System.nanoTime() + (timeoutMs * 1_000_000)

Review comment:
       Why don't we just use `System.currentTimeMillis()` to avoid conversion 
between nano time?

##########
File path: core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
##########
@@ -0,0 +1,550 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+import java.util.Properties
+import java.util.concurrent.ExecutionException
+
+import kafka.api.KAFKA_2_7_IV0
+import kafka.utils.TestUtils
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
+import kafka.utils.TestUtils.waitUntilTrue
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, 
FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
+import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.feature.FinalizedVersionRange
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.message.UpdateFeaturesRequestData
+import 
org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{UpdateFeaturesRequest, 
UpdateFeaturesResponse}
+import org.apache.kafka.common.utils.Utils
+import org.junit.Test
+import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, 
assertNotNull, assertTrue}
+import org.scalatest.Assertions.{assertThrows, intercept}
+
+import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
+import scala.util.matching.Regex
+
+class UpdateFeaturesTest extends BaseRequestTest {
+
+  override def brokerCount = 3
+
+  override def brokerPropertyOverrides(props: Properties): Unit = {
+    props.put(KafkaConfig.InterBrokerProtocolVersionProp, 
KAFKA_2_7_IV0.toString)
+  }
+
+  private def defaultSupportedFeatures(): Features[SupportedVersionRange] = {
+    Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new 
SupportedVersionRange(1, 3))))
+  }
+
+  private def defaultFinalizedFeatures(): Features[FinalizedVersionRange] = {
+    Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new 
FinalizedVersionRange(1, 2))))
+  }
+
+  private def updateSupportedFeatures(
+    features: Features[SupportedVersionRange], targetServers: 
Set[KafkaServer]): Unit = {
+    targetServers.foreach(s => {
+      s.brokerFeatures.setSupportedFeatures(features)
+      s.zkClient.updateBrokerInfo(s.createBrokerInfo)
+    })
+
+    // Wait until updates to all BrokerZNode supported features propagate to 
the controller.
+    val brokerIds = targetServers.map(s => s.config.brokerId)
+    waitUntilTrue(
+      () => servers.exists(s => {
+        if (s.kafkaController.isActive) {
+          s.kafkaController.controllerContext.liveOrShuttingDownBrokers
+            .filter(b => brokerIds.contains(b.id))
+            .forall(b => {
+              b.features.equals(features)
+            })
+        } else {
+          false
+        }
+      }),
+      "Controller did not get broker updates")
+  }
+
+  private def updateSupportedFeaturesInAllBrokers(features: 
Features[SupportedVersionRange]): Unit = {
+    updateSupportedFeatures(features, Set[KafkaServer]() ++ servers)
+  }
+
+  private def updateDefaultMinVersionLevelsInAllBrokers(newMinVersionLevels: 
Map[String, Short]): Unit = {
+    servers.foreach(s => {
+      s.brokerFeatures.setDefaultMinVersionLevels(newMinVersionLevels)
+    })
+  }
+
+  private def updateFeatureZNode(features: Features[FinalizedVersionRange]): 
Int = {
+    val server = serverForId(0).get
+    val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, features)
+    val newVersion = server.zkClient.updateFeatureZNode(newNode)
+    servers.foreach(s => {
+      s.featureCache.waitUntilEpochOrThrow(newVersion, 
s.config.zkConnectionTimeoutMs)
+    })
+    newVersion
+  }
+
+  private def getFeatureZNode(): FeatureZNode = {
+    val (mayBeFeatureZNodeBytes, version) = 
serverForId(0).get.zkClient.getDataAndVersion(FeatureZNode.path)
+    assertNotEquals(version, ZkVersion.UnknownVersion)
+    FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+  }
+
+  private def checkFeatures(client: Admin, expectedNode: FeatureZNode, 
expectedMetadata: FeatureMetadata): Unit = {
+    assertEquals(expectedNode, getFeatureZNode())
+    val featureMetadata = client.describeFeatures(
+      new 
DescribeFeaturesOptions().sendRequestToController(true)).featureMetadata().get()
+    assertEquals(expectedMetadata, featureMetadata)
+  }
+
+  private def checkException[ExceptionType <: Throwable](result: 
UpdateFeaturesResult,
+                                                         
featureExceptionMsgPatterns: Map[String, Regex])
+                                                        (implicit tag: 
ClassTag[ExceptionType]): Unit = {
+    featureExceptionMsgPatterns.foreach {
+      case (feature, exceptionMsgPattern) =>
+        val exception = intercept[ExecutionException] {
+          result.values().get(feature).get()
+        }
+        val cause = exception.getCause
+        assertNotNull(cause)
+        assertEquals(cause.getClass, tag.runtimeClass)
+        assertTrue(cause.getMessage, 
exceptionMsgPattern.findFirstIn(cause.getMessage).isDefined)
+    }
+
+  }
+
+  /**
+   * Tests whether an invalid feature update does not get processed on the 
server as expected,
+   * and raises the ExceptionType on the client side as expected.
+   *
+   * @param invalidUpdate         the invalid feature update to be sent in the
+   *                              updateFeatures request to the server
+   * @param exceptionMsgPattern   a pattern for the expected exception message
+   */
+  private def testWithInvalidFeatureUpdate[ExceptionType <: 
Throwable](feature: String,
+                                                                       
invalidUpdate: FeatureUpdate,
+                                                                       
exceptionMsgPattern: Regex)
+                                                                      
(implicit tag: ClassTag[ExceptionType]): Unit = {
+    TestUtils.waitUntilControllerElected(zkClient)
+
+    updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures())
+    val versionBefore = updateFeatureZNode(defaultFinalizedFeatures())
+    val adminClient = createAdminClient()
+    val nodeBefore = getFeatureZNode()
+
+    val result = adminClient.updateFeatures(Utils.mkMap(Utils.mkEntry(feature, 
invalidUpdate)), new UpdateFeaturesOptions())
+
+    checkException[ExceptionType](result, Map(feature -> exceptionMsgPattern))
+    checkFeatures(
+      adminClient,
+      nodeBefore,
+      new FeatureMetadata(defaultFinalizedFeatures(), versionBefore, 
defaultSupportedFeatures()))
+  }
+
+  @Test
+  def testShouldFailRequestIfNotController(): Unit = {
+    TestUtils.waitUntilControllerElected(zkClient)
+
+    updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures())
+    val versionBefore = updateFeatureZNode(defaultFinalizedFeatures())
+
+    val nodeBefore = getFeatureZNode()
+    val updates = new FeatureUpdateKeyCollection()
+    val update = new UpdateFeaturesRequestData.FeatureUpdateKey();
+    update.setFeature("feature_1");
+    
update.setMaxVersionLevel(defaultSupportedFeatures().get("feature_1").max())
+    update.setAllowDowngrade(false)
+    updates.add(update)
+
+    val response = connectAndReceive[UpdateFeaturesResponse](
+      new UpdateFeaturesRequest.Builder(new 
UpdateFeaturesRequestData().setFeatureUpdates(updates)).build(),
+      notControllerSocketServer)
+
+    assertEquals(1, response.data.results.size)
+    val result = response.data.results.asScala.head
+    assertEquals("feature_1", result.feature)
+    assertEquals(Errors.NOT_CONTROLLER, Errors.forCode(result.errorCode))
+    assertNotNull(result.errorMessage)
+    assertFalse(result.errorMessage.isEmpty)
+    checkFeatures(
+      createAdminClient(),
+      nodeBefore,
+      new FeatureMetadata(defaultFinalizedFeatures(), versionBefore, 
defaultSupportedFeatures()))
+  }
+
+  @Test
+  def testShouldFailRequestForEmptyUpdates(): Unit = {
+    val nullMap: util.Map[String, FeatureUpdate] = null
+    val emptyMap: util.Map[String, FeatureUpdate] = Utils.mkMap()
+    Set(nullMap, emptyMap).foreach { updates =>
+      val client = createAdminClient()
+      val exception = intercept[IllegalArgumentException] {
+        client.updateFeatures(updates, new UpdateFeaturesOptions())
+      }
+      assertNotNull(exception)
+      assertEquals("Feature updates can not be null or empty.", 
exception.getMessage)
+    }
+  }
+
+  @Test
+  def testShouldFailRequestForNullUpdateFeaturesOptions(): Unit = {
+    val client = createAdminClient()
+    val update = new 
FeatureUpdate(defaultSupportedFeatures().get("feature_1").max(), false)
+    val exception = intercept[NullPointerException] {
+      client.updateFeatures(Utils.mkMap(Utils.mkEntry("feature_1", update)), 
null)
+    }
+    assertNotNull(exception)
+    assertEquals("UpdateFeaturesOptions can not be null", exception.getMessage)
+  }
+
+  @Test
+  def testShouldFailRequestForInvalidFeatureName(): Unit = {
+    val client = createAdminClient()
+    val update = new 
FeatureUpdate(defaultSupportedFeatures().get("feature_1").max(), false)
+    val exception = intercept[IllegalArgumentException] {
+      client.updateFeatures(Utils.mkMap(Utils.mkEntry("", update)), new 
UpdateFeaturesOptions())
+    }
+    assertNotNull(exception)
+    assertTrue((".*Provided feature can not be null or 
empty.*"r).findFirstIn(exception.getMessage).isDefined)
+  }
+
+  @Test
+  def testShouldFailRequestWhenDowngradeFlagIsNotSetDuringDowngrade(): Unit = {
+    testWithInvalidFeatureUpdate[InvalidRequestException](
+      "feature_1",
+      new FeatureUpdate((defaultFinalizedFeatures().get("feature_1").max() - 
1).asInstanceOf[Short],false),
+      ".*Can not downgrade finalized feature: 'feature_1'.*allowDowngrade.*".r)
+  }
+
+  @Test
+  def testShouldFailRequestWhenDowngradeToHigherVersionLevelIsAttempted(): 
Unit = {
+    testWithInvalidFeatureUpdate[InvalidRequestException](
+      "feature_1",
+      new FeatureUpdate(defaultSupportedFeatures().get("feature_1").max(), 
true),
+      ".*finalized feature: 'feature_1'.*allowDowngrade.* provided 
maxVersionLevel:3.*existing maxVersionLevel:2.*".r)
+  }
+
+  @Test
+  def testShouldFailRequestInClientWhenDowngradeFlagIsNotSetDuringDeletion(): 
Unit = {
+    assertThrows[IllegalArgumentException] {
+      new FeatureUpdate(0, false)
+    }
+  }
+
+  @Test
+  def testShouldFailRequestInServerWhenDowngradeFlagIsNotSetDuringDeletion(): 
Unit = {
+    TestUtils.waitUntilControllerElected(zkClient)
+
+    updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures())
+    val versionBefore = updateFeatureZNode(defaultFinalizedFeatures())
+
+    val adminClient = createAdminClient()
+    val nodeBefore = getFeatureZNode()
+
+    val updates
+      = new UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
+    val update = new UpdateFeaturesRequestData.FeatureUpdateKey();
+    update.setFeature("feature_1")
+    update.setMaxVersionLevel(0)
+    update.setAllowDowngrade(false)
+    updates.add(update);
+    val requestData = new UpdateFeaturesRequestData()
+    requestData.setFeatureUpdates(updates);
+
+    val response = connectAndReceive[UpdateFeaturesResponse](
+      new UpdateFeaturesRequest.Builder(new 
UpdateFeaturesRequestData().setFeatureUpdates(updates)).build(),
+      controllerSocketServer)
+
+    assertEquals(1, response.data().results().size())
+    val result = response.data.results.asScala.head
+    assertEquals("feature_1", result.feature)
+    assertEquals(Errors.INVALID_REQUEST, Errors.forCode(result.errorCode))
+    assertNotNull(result.errorMessage)
+    assertFalse(result.errorMessage.isEmpty)
+    val exceptionMsgPattern = ".*Can not provide maxVersionLevel: 0 less than 
1 for feature: 'feature_1'.*allowDowngrade.*".r
+    assertTrue(exceptionMsgPattern.findFirstIn(result.errorMessage).isDefined)
+    checkFeatures(
+      adminClient,
+      nodeBefore,
+      new FeatureMetadata(defaultFinalizedFeatures(), versionBefore, 
defaultSupportedFeatures()))
+  }
+
+  @Test
+  def testShouldFailRequestDuringDeletionOfNonExistingFeature(): Unit = {
+    testWithInvalidFeatureUpdate[InvalidRequestException](
+      "feature_non_existing",
+      new FeatureUpdate(0, true),
+      ".*Can not delete non-existing finalized feature: 
'feature_non_existing'.*".r)
+  }
+
+  @Test
+  def testShouldFailRequestWhenUpgradingToSameVersionLevel(): Unit = {
+    testWithInvalidFeatureUpdate[InvalidRequestException](
+      "feature_1",
+      new FeatureUpdate(defaultFinalizedFeatures().get("feature_1").max(), 
false),
+      ".*Can not upgrade a finalized feature: 'feature_1'.*to the same 
value.*".r)
+  }
+
+  @Test
+  def testShouldFailRequestWhenDowngradingBelowMinVersionLevel(): Unit = {
+    TestUtils.waitUntilControllerElected(zkClient)
+
+    updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures())
+    val minVersionLevel = 2.asInstanceOf[Short]
+    updateDefaultMinVersionLevelsInAllBrokers(Map[String, Short]("feature_1" 
-> minVersionLevel))
+    val initialFinalizedFeatures = Features.finalizedFeatures(
+      Utils.mkMap(Utils.mkEntry("feature_1", new 
FinalizedVersionRange(minVersionLevel, 2))))
+    val versionBefore = updateFeatureZNode(initialFinalizedFeatures)
+
+    val update = new FeatureUpdate((minVersionLevel - 1).asInstanceOf[Short], 
true)
+    val adminClient = createAdminClient()
+    val nodeBefore = getFeatureZNode()
+
+    val result = adminClient.updateFeatures(
+      Utils.mkMap(Utils.mkEntry("feature_1", update)), new 
UpdateFeaturesOptions())
+
+    checkException[InvalidRequestException](
+      result,
+      Map("feature_1" -> ".*Can not downgrade finalized feature: 'feature_1' 
to maxVersionLevel:1.*existing minVersionLevel:2.*".r))
+    checkFeatures(
+      adminClient,
+      nodeBefore,
+      new FeatureMetadata(initialFinalizedFeatures, versionBefore, 
defaultSupportedFeatures()))
+  }
+
+  @Test
+  def testShouldFailRequestDuringBrokerMaxVersionLevelIncompatibility(): Unit 
= {
+    TestUtils.waitUntilControllerElected(zkClient)
+
+    val controller = servers.filter { server => 
server.kafkaController.isActive}.head
+    val nonControllerServers = servers.filter { server => 
!server.kafkaController.isActive}
+    val unsupportedBrokers = Set[KafkaServer](nonControllerServers.head)
+    val supportedBrokers = Set[KafkaServer](nonControllerServers(1), 
controller)
+
+    updateSupportedFeatures(defaultSupportedFeatures(), supportedBrokers)
+
+    val validMinVersion = defaultSupportedFeatures().get("feature_1").min()
+    val unsupportedMaxVersion =
+      (defaultSupportedFeatures().get("feature_1").max() - 
1).asInstanceOf[Short]
+    val badSupportedFeatures = Features.supportedFeatures(
+      Utils.mkMap(
+        Utils.mkEntry("feature_1",
+          new SupportedVersionRange(
+            validMinVersion,
+            unsupportedMaxVersion))))
+    updateSupportedFeatures(badSupportedFeatures, unsupportedBrokers)
+
+    val versionBefore = updateFeatureZNode(defaultFinalizedFeatures())
+
+    val invalidUpdate = new 
FeatureUpdate(defaultSupportedFeatures().get("feature_1").max(), false)
+    val nodeBefore = getFeatureZNode()
+    val adminClient = createAdminClient()
+    val result = adminClient.updateFeatures(
+      Utils.mkMap(Utils.mkEntry("feature_1", invalidUpdate)),
+      new UpdateFeaturesOptions())
+
+    checkException[InvalidRequestException](result, Map("feature_1" -> ".*1 
broker.*incompatible.*".r))
+    checkFeatures(
+      adminClient,
+      nodeBefore,
+      new FeatureMetadata(defaultFinalizedFeatures(), versionBefore, 
defaultSupportedFeatures()))
+  }
+
+  @Test
+  def testSuccessfulFeatureUpgradeAndWithNoExistingFinalizedFeatures(): Unit = 
{
+    TestUtils.waitUntilControllerElected(zkClient)
+
+    updateSupportedFeaturesInAllBrokers(
+      Features.supportedFeatures(
+        Utils.mkMap(
+          Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)),
+          Utils.mkEntry("feature_2", new SupportedVersionRange(2, 5)))))
+    updateDefaultMinVersionLevelsInAllBrokers(Map[String, Short]("feature_1" 
-> 1, "feature_2" -> 2))
+    val versionBefore = updateFeatureZNode(Features.emptyFinalizedFeatures())
+
+    val targetFinalizedFeatures = Features.finalizedFeatures(
+      Utils.mkMap(
+        Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 3)),
+        Utils.mkEntry("feature_2", new FinalizedVersionRange(2, 3))))
+    val update1 = new 
FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(), false)
+    val update2 = new 
FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(), false)
+
+    val expected = new FeatureMetadata(
+      targetFinalizedFeatures,
+      versionBefore + 1,
+      Features.supportedFeatures(
+        Utils.mkMap(
+          Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)),
+          Utils.mkEntry("feature_2", new SupportedVersionRange(2, 5)))))
+
+    val adminClient = createAdminClient()
+    adminClient.updateFeatures(
+      Utils.mkMap(Utils.mkEntry("feature_1", update1), 
Utils.mkEntry("feature_2", update2)),
+      new UpdateFeaturesOptions()
+    ).all().get()
+
+    checkFeatures(
+      adminClient,
+      new FeatureZNode(FeatureZNodeStatus.Enabled, targetFinalizedFeatures),
+      expected)
+  }
+
+  @Test
+  def testSuccessfulFeatureUpgradeAndDowngrade(): Unit = {
+    TestUtils.waitUntilControllerElected(zkClient)
+
+    updateSupportedFeaturesInAllBrokers(
+      Features.supportedFeatures(
+        Utils.mkMap(
+          Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)),
+          Utils.mkEntry("feature_2", new SupportedVersionRange(2, 5)))))
+    updateDefaultMinVersionLevelsInAllBrokers(Map[String, Short]("feature_1" 
-> 1, "feature_2" -> 2))
+    val versionBefore = updateFeatureZNode(
+      Features.finalizedFeatures(
+        Utils.mkMap(
+          Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 2)),
+          Utils.mkEntry("feature_2", new FinalizedVersionRange(2, 4)))))
+
+    val targetFinalizedFeatures = Features.finalizedFeatures(
+      Utils.mkMap(
+        Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 3)),
+        Utils.mkEntry("feature_2", new FinalizedVersionRange(2, 3))))
+    val update1 = new 
FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(), false)
+    val update2 = new 
FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(), true)
+
+    val expected = new FeatureMetadata(
+      targetFinalizedFeatures,
+      versionBefore + 1,
+      Features.supportedFeatures(
+        Utils.mkMap(
+          Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)),
+          Utils.mkEntry("feature_2", new SupportedVersionRange(2, 5)))))
+
+    val adminClient = createAdminClient()
+    adminClient.updateFeatures(
+      Utils.mkMap(Utils.mkEntry("feature_1", update1), 
Utils.mkEntry("feature_2", update2)),
+      new UpdateFeaturesOptions()
+    ).all().get()
+
+    checkFeatures(
+      adminClient,
+      new FeatureZNode(FeatureZNodeStatus.Enabled, targetFinalizedFeatures),
+      expected)
+  }
+
+  @Test
+  def testPartialSuccessDuringValidFeatureUpgradeAndInvalidDowngrade(): Unit = 
{
+    TestUtils.waitUntilControllerElected(zkClient)
+
+    val initialSupportedFeatures = Features.supportedFeatures(

Review comment:
       nit: this could be extracted as a common struct.

##########
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "request",
+  "name": "UpdateFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+",
+      "about": "The list of updates to finalized features.", "fields": [
+      {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
+        "about": "The name of the finalized feature to be updated."},
+      {"name": "MaxVersionLevel", "type": "int16", "versions": "0+",

Review comment:
       Same here

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.FinalizedVersionRange;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is 
particularly useful
+ * to hold the result returned by the {@link 
Admin#describeFeatures(DescribeFeaturesOptions)} API.
+ */
+public class FeatureMetadata {
+
+    private final Features<FinalizedVersionRange> finalizedFeatures;
+
+    private final Optional<Integer> finalizedFeaturesEpoch;
+
+    private final Features<SupportedVersionRange> supportedFeatures;
+
+    public FeatureMetadata(

Review comment:
       Try to put first parameter on the same line as the constructor, and 
align the rest parameters.

##########
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "request",
+  "name": "UpdateFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+",
+      "about": "The list of updates to finalized features.", "fields": [
+      {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,

Review comment:
       Space

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1215,71 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection<ClientQuotaAlterati
      */
     AlterClientQuotasResult 
alterClientQuotas(Collection<ClientQuotaAlteration> entries, 
AlterClientQuotasOptions options);
 
+    /**
+     * Describes finalized as well as supported features. By default, the 
request is issued to any
+     * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+     * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+     * finalized features.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+     * returned {@link DescribeFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the describe operation could 
finish.</li>
+     * </ul>
+     * <p>
+     * @param options   the options to use
+     *
+     * @return          the {@link DescribeFeaturesResult} containing the 
result
+     */
+    DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+    /**
+     * Applies specified updates to finalized features. This operation is not 
transactional so it
+     * may succeed for some features while fail for others.
+     * <p>
+     * The API takes in a map of finalized feature name to {@link 
FeatureUpdate} that needs to be
+     * applied. Each entry in the map specifies the finalized feature to be 
added or updated or
+     * deleted, along with the new max feature version level value. This 
request is issued only to
+     * the controller since the API is only served by the controller. The 
return value contains an
+     * error code for each supplied {@link FeatureUpdate}, and the code 
indicates if the update
+     * succeeded or failed in the controller.
+     * <ul>
+     * <li>Downgrade of feature version level is not a regular 
operation/intent. It is only allowed
+     * in the controller if the {@link FeatureUpdate} has the allowDowngrade 
flag set - setting this
+     * flag conveys user intent to attempt downgrade of a feature max version 
level. Note that
+     * despite the allowDowngrade flag being set, certain downgrades may be 
rejected by the
+     * controller if it is deemed impossible.</li>
+     * <li>Deletion of a finalized feature version is not a regular 
operation/intent. It could be
+     * done by setting the allowDowngrade flag to true in the {@link 
FeatureUpdate}, and, setting
+     * the max version level to be less than 1.</li>
+     * </ul>
+     *<p>
+     * The following exceptions can be anticipated when calling {@code get()} 
on the futures
+     * obtained from the returned {@link UpdateFeaturesResult}:
+     * <ul>
+     *   <li>{@link 
org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   If the authenticated user didn't have alter access to the 
cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}

Review comment:
       should this a per feature error or a top level error?

##########
File path: clients/src/main/resources/common/message/UpdateFeaturesResponse.json
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "response",
+  "name": "UpdateFeaturesResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [

Review comment:
       For top level exception such as cluster authorization exception, we 
could just define a top level error code instead of check-marking every feature 
with the redundant error code. I know we have been a bit inconsistent in such a 
case, but personally feel having layered error codes could make the response 
handling clear of whether it is per feature issue, or a high level issue.

##########
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "request",
+  "name": "UpdateFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+",
+      "about": "The list of updates to finalized features.", "fields": [
+      {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
+        "about": "The name of the finalized feature to be updated."},
+      {"name": "MaxVersionLevel", "type": "int16", "versions": "0+",
+        "about": "The new maximum version level for the finalized feature. A 
value >= 1 is valid. A value < 1, is special, and can be used to request the 
deletion of the finalized feature."},
+      {"name": "AllowDowngrade", "type": "bool", "versions": "0+",

Review comment:
       Same here

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)}
+ *
+ * The API of this class is evolving. See {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeFeaturesOptions extends 
AbstractOptions<DescribeFeaturesOptions> {
+
+    /**
+     * - True means the {@link 
Admin#describeFeatures(DescribeFeaturesOptions)} request can be
+     *   issued only to the controller.
+     * - False means the {@link 
Admin#describeFeatures(DescribeFeaturesOptions)} request can be

Review comment:
       `could be processed by any random broker`

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4052,6 +4058,128 @@ void handleFailure(Throwable throwable) {
         return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
     }
 
+    @Override
+    public DescribeFeaturesResult describeFeatures(final 
DescribeFeaturesOptions options) {
+        final KafkaFutureImpl<FeatureMetadata> future = new 
KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final NodeProvider provider =
+            options.sendRequestToController() ? new ControllerNodeProvider() : 
new LeastLoadedNodeProvider();
+
+        Call call = new Call(
+            "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+            @Override
+            ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+                return new ApiVersionsRequest.Builder();
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final ApiVersionsResponse apiVersionsResponse = 
(ApiVersionsResponse) response;
+                if (apiVersionsResponse.data.errorCode() == 
Errors.NONE.code()) {
+                    future.complete(
+                        new FeatureMetadata(
+                            apiVersionsResponse.finalizedFeatures(),
+                            apiVersionsResponse.finalizedFeaturesEpoch(),
+                            apiVersionsResponse.supportedFeatures()));
+                } else if (options.sendRequestToController() && 
apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) {
+                    handleNotControllerError(Errors.NOT_CONTROLLER);
+                } else {
+                    future.completeExceptionally(
+                        
Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(Collections.singletonList(future), 
throwable);
+            }
+        };
+
+        runnable.call(call, now);
+        return new DescribeFeaturesResult(future);
+    }
+
+    @Override
+    public UpdateFeaturesResult updateFeatures(
+        final Map<String, FeatureUpdate> featureUpdates, final 
UpdateFeaturesOptions options) {
+        if (featureUpdates == null || featureUpdates.isEmpty()) {
+            throw new IllegalArgumentException("Feature updates can not be 
null or empty.");
+        }
+        Objects.requireNonNull(options, "UpdateFeaturesOptions can not be 
null");
+
+        final Map<String, KafkaFutureImpl<Void>> updateFutures = new 
HashMap<>();
+        final UpdateFeaturesRequestData.FeatureUpdateKeyCollection 
featureUpdatesRequestData
+            = new UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
+        for (Map.Entry<String, FeatureUpdate> entry : 
featureUpdates.entrySet()) {
+            final String feature = entry.getKey();
+            final FeatureUpdate update = entry.getValue();
+            if (feature.trim().isEmpty()) {
+                throw new IllegalArgumentException("Provided feature can not 
be null or empty.");
+            }
+
+            updateFutures.put(feature, new KafkaFutureImpl<>());
+            final UpdateFeaturesRequestData.FeatureUpdateKey requestItem =
+                new UpdateFeaturesRequestData.FeatureUpdateKey();
+            requestItem.setFeature(feature);
+            requestItem.setMaxVersionLevel(update.maxVersionLevel());
+            requestItem.setAllowDowngrade(update.allowDowngrade());
+            featureUpdatesRequestData.add(requestItem);
+        }
+        final UpdateFeaturesRequestData request = new 
UpdateFeaturesRequestData().setFeatureUpdates(featureUpdatesRequestData);
+
+        final long now = time.milliseconds();
+        final Call call = new Call("updateFeatures", calcDeadlineMs(now, 
options.timeoutMs()),
+            new ControllerNodeProvider()) {
+
+            @Override
+            UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
+                return new UpdateFeaturesRequest.Builder(request);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                final UpdateFeaturesResponse response =
+                    (UpdateFeaturesResponse) abstractResponse;
+
+                // Check for controller change.
+                for (UpdatableFeatureResult result : 
response.data().results()) {
+                    final Errors error = Errors.forCode(result.errorCode());
+                    if (error == Errors.NOT_CONTROLLER) {
+                        handleNotControllerError(error);
+                        throw error.exception();
+                    }
+                }
+
+                for (UpdatableFeatureResult result : 
response.data().results()) {
+                    final KafkaFutureImpl<Void> future = 
updateFutures.get(result.feature());
+                    if (future == null) {

Review comment:
       Does this overlap with `completeUnrealizedFutures` check? We could just 
keep one to reduce the checking complexity. 

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.FinalizedVersionRange;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is 
particularly useful
+ * to hold the result returned by the {@link 
Admin#describeFeatures(DescribeFeaturesOptions)} API.
+ */
+public class FeatureMetadata {
+
+    private final Features<FinalizedVersionRange> finalizedFeatures;
+
+    private final Optional<Integer> finalizedFeaturesEpoch;
+
+    private final Features<SupportedVersionRange> supportedFeatures;
+
+    public FeatureMetadata(
+        final Features<FinalizedVersionRange> finalizedFeatures,
+        final int finalizedFeaturesEpoch,
+        final Features<SupportedVersionRange> supportedFeatures) {
+        Objects.requireNonNull(finalizedFeatures, "Provided finalizedFeatures 
can not be null.");
+        Objects.requireNonNull(supportedFeatures, "Provided supportedFeatures 
can not be null.");
+        this.finalizedFeatures = finalizedFeatures;
+        if (finalizedFeaturesEpoch >= 0) {
+            this.finalizedFeaturesEpoch = Optional.of(finalizedFeaturesEpoch);
+        } else {
+            this.finalizedFeaturesEpoch = Optional.empty();
+        }
+        this.supportedFeatures = supportedFeatures;
+    }
+
+    /**
+     * A map of finalized feature versions, with key being finalized feature 
name and value
+     * containing the min/max version levels for the finalized feature.
+     */
+    public Features<FinalizedVersionRange> finalizedFeatures() {
+        return finalizedFeatures;
+    }
+
+    /**
+     * The epoch for the finalized features.
+     * If the returned value is empty, it means the finalized features are 
absent/unavailable.
+     */
+    public Optional<Integer> finalizedFeaturesEpoch() {
+        return finalizedFeaturesEpoch;
+    }
+
+    /**
+     * A map of supported feature versions, with key being supported feature 
name and value
+     * containing the min/max version for the supported feature.
+     */
+    public Features<SupportedVersionRange> supportedFeatures() {
+        return supportedFeatures;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (!(other instanceof FeatureMetadata)) {
+            return false;
+        }
+
+        final FeatureMetadata that = (FeatureMetadata) other;
+        return Objects.equals(this.finalizedFeatures, that.finalizedFeatures) 
&&
+            Objects.equals(this.finalizedFeaturesEpoch, 
that.finalizedFeaturesEpoch) &&
+            Objects.equals(this.supportedFeatures, that.supportedFeatures);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(finalizedFeatures, finalizedFeaturesEpoch, 
supportedFeatures);
+    }
+
+    @Override
+    public String toString() {
+        return String.format(
+            "FeatureMetadata{finalized:%s, finalizedFeaturesEpoch:%d, 
supported:%s}",
+            finalizedFeatures,
+            finalizedFeaturesEpoch,

Review comment:
       This won't work well with string format, consider doing `orElse`

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)}
+ *
+ * The API of this class is evolving. See {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeFeaturesOptions extends 
AbstractOptions<DescribeFeaturesOptions> {
+
+    /**
+     * - True means the {@link 
Admin#describeFeatures(DescribeFeaturesOptions)} request can be
+     *   issued only to the controller.
+     * - False means the {@link 
Admin#describeFeatures(DescribeFeaturesOptions)} request can be
+     *   issued to any random broker.
+     */
+    private boolean sendRequestToController = false;
+
+    /**
+     * Sets a flag indicating that the describe features request should be 
issued to the controller.

Review comment:
       Same here

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FeatureUpdate.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Encapsulates details about an update to a finalized feature. This is 
particularly useful to
+ * define each feature update in the {@link Admin#updateFeatures(Map, 
UpdateFeaturesOptions)} API.
+ */
+public class FeatureUpdate {
+    private final short maxVersionLevel;
+    private final boolean allowDowngrade;
+
+    /**
+     * @param maxVersionLevel   the new maximum version level for the 
finalized feature.
+     *                          a value < 1 is special and indicates that the 
update is intended to
+     *                          delete the finalized feature, and should be 
accompanied by setting
+     *                          the allowDowngrade flag to true.
+     * @param allowDowngrade    - true, if this feature update was meant to 
downgrade the existing
+     *                            maximum version level of the finalized 
feature.
+     *                          - false, otherwise.
+     */
+    public FeatureUpdate(final short maxVersionLevel, final boolean 
allowDowngrade) {
+        if (maxVersionLevel < 1 && !allowDowngrade) {
+            throw new IllegalArgumentException(String.format(
+                "The allowDowngrade flag should be set when the provided 
maxVersionLevel:%d is < 1.",
+                maxVersionLevel));
+        }
+        this.maxVersionLevel = maxVersionLevel;
+        this.allowDowngrade = allowDowngrade;
+    }
+
+    public short maxVersionLevel() {
+        return maxVersionLevel;
+    }
+
+    public boolean allowDowngrade() {
+        return allowDowngrade;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (!(other instanceof FeatureUpdate)) {
+            return false;
+        }
+
+        final FeatureUpdate that = (FeatureUpdate) other;
+        return this.maxVersionLevel == that.maxVersionLevel && 
this.allowDowngrade == that.allowDowngrade;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(maxVersionLevel, allowDowngrade);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("FeatureUpdate{maxVersionLevel:%d, 
allowDowngrade:%s}", maxVersionLevel, allowDowngrade);
+    }
+}

Review comment:
       new line

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -3193,6 +3238,104 @@ public void testListOffsetsNonRetriableErrors() throws 
Exception {
         }
     }
 
+    @Test
+    public void testUpdateFeaturesDuringSuccess() throws Exception {
+        testUpdateFeaturesDuringError(Errors.NONE);
+    }
+
+    @Test
+    public void testUpdateFeaturesInvalidRequestError() throws Exception {
+        testUpdateFeaturesDuringError(Errors.INVALID_REQUEST);
+    }
+
+    @Test
+    public void testUpdateFeaturesUpdateFailedError() throws Exception {
+        testUpdateFeaturesDuringError(Errors.FEATURE_UPDATE_FAILED);
+    }
+
+    private void testUpdateFeaturesDuringError(Errors error) throws Exception {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().prepareResponse(
+                body -> body instanceof UpdateFeaturesRequest,
+                prepareUpdateFeaturesResponse(error));
+            final KafkaFuture<Void> future = env.adminClient().updateFeatures(
+                new HashSet<>(
+                    Arrays.asList(
+                        new FeatureUpdate(
+                            "test_feature_1", (short) 2, false),
+                        new FeatureUpdate(
+                            "test_feature_2", (short) 3, true))),
+                new UpdateFeaturesOptions().timeoutMs(10000)).result();
+            if (error.exception() == null) {
+                future.get();
+            } else {
+                final ExecutionException e = 
assertThrows(ExecutionException.class,
+                    () -> future.get());
+                assertEquals(e.getCause().getClass(), 
error.exception().getClass());
+            }
+        }
+    }
+
+    @Test
+    public void testUpdateFeaturesHandleNotControllerException() throws 
Exception {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().prepareResponseFrom(
+                prepareUpdateFeaturesResponse(Errors.NOT_CONTROLLER),
+                env.cluster().nodeById(0));
+            
env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(env.cluster().nodes(),
+                env.cluster().clusterResource().clusterId(),
+                1,
+                Collections.<MetadataResponse.TopicMetadata>emptyList()));
+            env.kafkaClient().prepareResponseFrom(

Review comment:
       You are right, it seems not necessary.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.message.UpdateFeaturesResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+
+/**
+ * Possible error codes:
+ *
+ *   - {@link Errors#CLUSTER_AUTHORIZATION_FAILED}
+ *   - {@link Errors#NOT_CONTROLLER}
+ *   - {@link Errors#INVALID_REQUEST}
+ *   - {@link Errors#FEATURE_UPDATE_FAILED}
+ */
+public class UpdateFeaturesResponse extends AbstractResponse {
+
+    private final UpdateFeaturesResponseData data;
+
+    public UpdateFeaturesResponse(UpdateFeaturesResponseData data) {
+        this.data = data;
+    }
+
+    public UpdateFeaturesResponse(Struct struct) {
+        final short latestVersion = (short) 
(UpdateFeaturesResponseData.SCHEMAS.length - 1);
+        this.data = new UpdateFeaturesResponseData(struct, latestVersion);
+    }
+
+    public UpdateFeaturesResponse(Struct struct, short version) {
+        this.data = new UpdateFeaturesResponseData(struct, version);
+    }
+
+    public Map<String, ApiError> errors() {
+        return data.results().valuesSet().stream().collect(
+            Collectors.toMap(
+                result -> result.feature(),

Review comment:
       nit: could be replaced with lambda

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##########
@@ -143,7 +172,13 @@ public static ApiVersionsResponse apiVersionsResponse(
         Features<FinalizedVersionRange> finalizedFeatures,
         int finalizedFeaturesEpoch) {
         if (maxMagic == RecordBatch.CURRENT_MAGIC_VALUE && throttleTimeMs == 
DEFAULT_THROTTLE_TIME) {
-            return DEFAULT_API_VERSIONS_RESPONSE;
+            return new ApiVersionsResponse(createApiVersionsResponseData(
+                DEFAULT_API_VERSIONS_RESPONSE.throttleTimeMs(),
+                
Errors.forCode(DEFAULT_API_VERSIONS_RESPONSE.data().errorCode()),
+                DEFAULT_API_VERSIONS_RESPONSE.data().apiKeys(),
+                latestSupportedFeatures,
+                finalizedFeatures,
+                finalizedFeaturesEpoch));

Review comment:
       Comment here since no better place: createApiVersionsResponse on L198 
could be made private

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -3615,6 +3662,137 @@ public void testListOffsetsNonRetriableErrors() throws 
Exception {
         }
     }
 
+    @Test
+    public void testUpdateFeaturesDuringSuccess() throws Exception {
+        testUpdateFeatures(
+            makeTestFeatureUpdates(),
+            makeTestFeatureUpdateErrors(Errors.NONE));
+    }
+
+    @Test
+    public void testUpdateFeaturesInvalidRequestError() throws Exception {
+        testUpdateFeatures(
+            makeTestFeatureUpdates(),
+            makeTestFeatureUpdateErrors(Errors.INVALID_REQUEST));
+    }
+
+    @Test
+    public void testUpdateFeaturesUpdateFailedError() throws Exception {
+        testUpdateFeatures(
+            makeTestFeatureUpdates(),
+            makeTestFeatureUpdateErrors(Errors.FEATURE_UPDATE_FAILED));
+    }
+
+    @Test
+    public void testUpdateFeaturesPartialSuccess() throws Exception {
+        final Map<String, Errors> errors = 
makeTestFeatureUpdateErrors(Errors.NONE);
+        errors.put("test_feature_2", Errors.INVALID_REQUEST);
+        testUpdateFeatures(makeTestFeatureUpdates(), errors);
+    }
+
+    private Map<String, FeatureUpdate> makeTestFeatureUpdates() {
+        return Utils.mkMap(
+            Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2, 
false)),
+            Utils.mkEntry("test_feature_2", new FeatureUpdate((short) 3, 
true)));
+    }
+
+    private Map<String, Errors> makeTestFeatureUpdateErrors(final Errors 
error) {
+        final Map<String, FeatureUpdate> updates = makeTestFeatureUpdates();
+        final Map<String, Errors> errors = new HashMap<>();
+        for (Map.Entry<String, FeatureUpdate> entry : updates.entrySet()) {
+            errors.put(entry.getKey(), error);
+        }
+        return errors;
+    }
+
+    private void testUpdateFeatures(Map<String, FeatureUpdate> featureUpdates,
+                                    Map<String, Errors> featureUpdateErrors) 
throws Exception {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().prepareResponse(
+                body -> body instanceof UpdateFeaturesRequest,
+                prepareUpdateFeaturesResponse(featureUpdateErrors));
+            final Map<String, KafkaFuture<Void>> futures = 
env.adminClient().updateFeatures(
+                featureUpdates,
+                new UpdateFeaturesOptions().timeoutMs(10000)).values();
+            for (Map.Entry<String, KafkaFuture<Void>> entry : 
futures.entrySet()) {
+                final KafkaFuture<Void> future = entry.getValue();
+                final Errors error = featureUpdateErrors.get(entry.getKey());
+                if (error == Errors.NONE) {
+                    future.get();
+                } else {
+                    final ExecutionException e = 
assertThrows(ExecutionException.class,
+                        () -> future.get());

Review comment:
       nit: could use lambda

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -266,6 +275,178 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+    info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+    zkClient.createFeatureZNode(newNode)
+    val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+    newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+    info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+    zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (String) and a range of 
versions (defined by a
+   * {@link SupportedVersionRange}). It refers to a feature that a particular 
broker advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (String) and a range of 
version levels (defined
+   * by a {@link FinalizedVersionRange}). Whenever the feature versioning 
system (KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *    A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *    setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *    the possible supported features finalized immediately. Assuming this 
is the case, the
+   *    controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *    it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *    default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *    Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *    broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *    system (KIP-584). This means the user is upgrading from an earlier 
version of the broker
+   *    binary. In this case, we want to start with no finalized features and 
allow the user to
+   *    finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *    to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *    features. This process ensures we do not enable all the possible 
features immediately after
+   *    an upgrade, which could be harmful to Kafka.
+   *    This is how we handle such a case:
+   *      - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *        controller will start up and check if the FeatureZNode is absent. 
If absent, it will
+   *        react by creating a FeatureZNode with disabled status and empty 
finalized features.
+   *        Otherwise, if a node already exists in enabled status then the 
controller will just
+   *        flip the status to disabled and clear the finalized features.
+   *      - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *        KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *        and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *        Instead it will just switch the FeatureZNode status to enabled 
status. This lets the
+   *        user finalize the features later.
+   *
+   * 3. Broker binary upgraded, with existing cluster IBP config >= 
KAFKA_2_7_IV0:
+   *    Imagine an existing Kafka cluster with IBP config >= KAFKA_2_7_IV0, 
and the broker binary
+   *    has just been upgraded to a newer version (that supports IBP config 
KAFKA_2_7_IV0 and higher).
+   *    The controller will start up and find that a FeatureZNode is already 
present with enabled
+   *    status and existing finalized features. In such a case, the controller 
needs to scan the
+   *    existing finalized features and mutate them for the purpose of version 
level deprecation
+   *    (if needed).
+   *    This is how we handle this case: If an existing finalized feature is 
present in the default
+   *    finalized features, then, its existing minimum version level is 
updated to the default
+   *    minimum version level maintained in the BrokerFeatures object. The 
goal of this mutation is
+   *    to permanently deprecate one or more feature version levels. The range 
of feature version
+   *    levels deprecated are from the closed range: 
[existing_min_version_level, default_min_version_level].
+   *    NOTE: Deprecating a feature version level is an incompatible change, 
which requires a major
+   *    release of Kafka. In such a release, the minimum version level 
maintained within the
+   *    BrokerFeatures class is updated suitably to record the deprecation of 
the feature.
+   *
+   * 4. Broker downgrade:
+   *    Imagine that a Kafka cluster exists already and the IBP config is 
greater than or equal to
+   *    KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by 
setting IBP config to a
+   *    value less than KAFKA_2_7_IV0. This means the user is also disabling 
the feature versioning
+   *    system (KIP-584). In this case, when the controller starts up with the 
lower IBP config, it
+   *    will switch the FeatureZNode status to disabled with empty features.
+   */
+  private def enableFeatureVersioning(): Unit = {
+    val defaultFinalizedFeatures = brokerFeatures.getDefaultFinalizedFeatures
+    val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(FeatureZNode.path)
+    if (version == ZkVersion.UnknownVersion) {
+      val newVersion = createFeatureZNode(new 
FeatureZNode(FeatureZNodeStatus.Enabled, defaultFinalizedFeatures))
+      featureCache.waitUntilEpochOrThrow(newVersion, 
config.zkConnectionTimeoutMs)
+    } else {
+      val existingFeatureZNode = 
FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+      var newFeatures: Features[FinalizedVersionRange] = 
Features.emptyFinalizedFeatures()
+      if (existingFeatureZNode.status.equals(FeatureZNodeStatus.Enabled)) {
+        newFeatures = 
Features.finalizedFeatures(existingFeatureZNode.features.features().asScala.map 
{
+          case (featureName, existingVersionRange) =>
+            val brokerDefaultVersionRange = 
defaultFinalizedFeatures.get(featureName)
+            if (brokerDefaultVersionRange == null) {
+              warn(s"Existing finalized feature: $featureName with 
$existingVersionRange"
+                + s" is absent in default finalized $defaultFinalizedFeatures")
+              (featureName, existingVersionRange)
+            } else if (brokerDefaultVersionRange.max() >= 
existingVersionRange.max() &&
+                       brokerDefaultVersionRange.min() <= 
existingVersionRange.max()) {
+              // Through this change, we deprecate all version levels in the 
closed range:
+              // [existingVersionRange.min(), brokerDefaultVersionRange.min() 
- 1]
+              (featureName, new 
FinalizedVersionRange(brokerDefaultVersionRange.min(), 
existingVersionRange.max()))
+            } else {
+              // If the existing version levels fall completely outside the

Review comment:
       Are we good to proceed in this case? When there is no overlapping 
between broker default features and remote finalized features, is the current 
controller still eligible?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -3615,6 +3662,137 @@ public void testListOffsetsNonRetriableErrors() throws 
Exception {
         }
     }
 
+    @Test
+    public void testUpdateFeaturesDuringSuccess() throws Exception {
+        testUpdateFeatures(
+            makeTestFeatureUpdates(),
+            makeTestFeatureUpdateErrors(Errors.NONE));
+    }
+
+    @Test
+    public void testUpdateFeaturesInvalidRequestError() throws Exception {
+        testUpdateFeatures(
+            makeTestFeatureUpdates(),
+            makeTestFeatureUpdateErrors(Errors.INVALID_REQUEST));
+    }
+
+    @Test
+    public void testUpdateFeaturesUpdateFailedError() throws Exception {
+        testUpdateFeatures(
+            makeTestFeatureUpdates(),
+            makeTestFeatureUpdateErrors(Errors.FEATURE_UPDATE_FAILED));
+    }
+
+    @Test
+    public void testUpdateFeaturesPartialSuccess() throws Exception {
+        final Map<String, Errors> errors = 
makeTestFeatureUpdateErrors(Errors.NONE);
+        errors.put("test_feature_2", Errors.INVALID_REQUEST);
+        testUpdateFeatures(makeTestFeatureUpdates(), errors);
+    }
+
+    private Map<String, FeatureUpdate> makeTestFeatureUpdates() {
+        return Utils.mkMap(
+            Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2, 
false)),
+            Utils.mkEntry("test_feature_2", new FeatureUpdate((short) 3, 
true)));
+    }
+
+    private Map<String, Errors> makeTestFeatureUpdateErrors(final Errors 
error) {
+        final Map<String, FeatureUpdate> updates = makeTestFeatureUpdates();

Review comment:
       Could we make `updates` as a pass-in parameter to avoid calling 
`makeTestFeatureUpdates` twice?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2956,6 +2959,37 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleUpdateFeatures(request: RequestChannel.Request): Unit = {
+    val updateFeaturesRequest = request.body[UpdateFeaturesRequest]
+    def featureUpdateErrors(error: Errors, msgOverride: Option[String]): 
Map[String, ApiError] = {
+      updateFeaturesRequest.data().featureUpdates().asScala.map(
+        update => update.feature() -> new ApiError(error, 
msgOverride.getOrElse(error.message()))
+      ).toMap
+    }
+
+    def sendResponseCallback(updateErrors: Map[String, ApiError]): Unit = {
+      val results = new UpdatableFeatureResultCollection()

Review comment:
       Could be moved to `UpdateFeaturesResponse` as a utility.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -266,6 +275,179 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+    info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+    zkClient.createFeatureZNode(newNode)
+    val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+    newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+    info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+    zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (String) and a range of 
versions (defined by a
+   * {@link SupportedVersionRange}). It refers to a feature that a particular 
broker advertises
+   * support for. Each broker advertises the version ranges of it’s own 
supported features in its
+   * own BrokerIdZnode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is is represented by a name (String) and a range of 
version levels (defined
+   * by a {@link FinalizedVersionRange}). Whenever the feature versioning 
system (KIP-584) is
+   * enabled, the finalized features are stored in ZK in the cluster-wide 
common FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the one and 
only entity modifying
+   * the information about finalized features and their version levels.
+   *
+   * This method sets up the FeatureZNode with enabled status. This status 
means the feature
+   * versioning system (KIP-584) is enabled, and, the finalized features 
stored in the FeatureZNode
+   * are active. This status should be written by the controller to the 
FeatureZNode only when the
+   * broker IBP config is greater than or equal to KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *    A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *    setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *    the possible supported features finalized immediately. Assuming this 
is the case, the
+   *    controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *    it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *    default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *    Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *    Broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *    system (KIP-584). This means the user is upgrading from an earlier 
version of the Broker
+   *    binary. In this case, we want to start with no finalized features and 
allow the user to
+   *    finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *    to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *    features. The reason to do this is that enabling all the possible 
features immediately after
+   *    an upgrade could be harmful to the cluster.
+   *    This is how we handle such a case:
+   *      - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *        controller will start up and check if the FeatureZNode is absent. 
If absent, then it
+   *        will react by creating a FeatureZNode with disabled status and 
empty finalized features.
+   *        Otherwise, if a node already exists in enabled status then the 
controller will just
+   *        flip the status to disabled and clear the finalized features.
+   *      - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *        KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *        and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *        Instead it will just switch the FeatureZNode status to enabled 
status. This lets the
+   *        user finalize the features later.
+   *
+   * 3. Broker binary upgraded, with existing cluster IBP config >= 
KAFKA_2_7_IV0:
+   *    Imagine an existing Kafka cluster with IBP config >= KAFKA_2_7_IV0, 
and the broker binary
+   *    has just been upgraded to a newer version (that supports IBP config 
KAFKA_2_7_IV0 and higher).
+   *    The controller will start up and find that a FeatureZNode is already 
present with enabled
+   *    status and existing finalized features. In such a case, the controller 
needs to scan the
+   *    existing finalized features and mutate them for the purpose of version 
level deprecation
+   *    (if needed).
+   *    This is how we handle this case: If an existing finalized feature is 
present in the default
+   *    finalized features, then, it's existing minimum version level is 
updated to the default
+   *    minimum version level maintained in the BrokerFeatures object. The 
goal of this mutation is
+   *    to permanently deprecate one or more feature version levels. The range 
of feature version
+   *    levels deprecated are from the closed range: 
[existing_min_version_level, default_min_version_level].
+   *    NOTE: Deprecating a feature version level is an incompatible change, 
which requires a major
+   *    release of Kafka. In such a release, the minimum version level 
maintained within the
+   *    BrokerFeatures class is updated suitably to record the deprecation of 
the feature.
+   *
+   * 4. Broker downgrade:
+   *    Imagine that a Kafka cluster exists already and the IBP config is 
greater than or equal to
+   *    KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by 
setting IBP config to a
+   *    value less than KAFKA_2_7_IV0. This means the user is also disabling 
the feature versioning
+   *    system (KIP-584). In this case, when the controller starts up with the 
lower IBP config, it
+   *    will switch the FeatureZNode status to disabled with empty features.
+   */
+  private def enableFeatureVersioning(): Unit = {
+    val defaultFinalizedFeatures = brokerFeatures.getDefaultFinalizedFeatures
+    val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(FeatureZNode.path)
+    if (version == ZkVersion.UnknownVersion) {
+      val newVersion = createFeatureZNode(new 
FeatureZNode(FeatureZNodeStatus.Enabled, defaultFinalizedFeatures))
+      featureCache.waitUntilEpochOrThrow(newVersion, 
config.zkConnectionTimeoutMs)
+    } else {
+      val existingFeatureZNode = 
FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+      var newFeatures: Features[FinalizedVersionRange] = 
Features.emptyFinalizedFeatures()
+      if (existingFeatureZNode.status.equals(FeatureZNodeStatus.Enabled)) {
+        newFeatures = 
Features.finalizedFeatures(existingFeatureZNode.features.features().asScala.map 
{
+          case (featureName, existingVersionRange) => {
+            val brokerDefaultVersionRange = 
defaultFinalizedFeatures.get(featureName)
+            if (brokerDefaultVersionRange == null) {
+              warn(s"Existing finalized feature: $featureName with 
$existingVersionRange"
+                + s" is absent in default finalized $defaultFinalizedFeatures")
+              (featureName, existingVersionRange)
+            } else if (existingVersionRange.max() >= 
brokerDefaultVersionRange.min() &&
+                       brokerDefaultVersionRange.max() >= 
existingVersionRange.max()) {
+              // Through this change, we deprecate all version levels in the 
closed range:
+              // [existingVersionRange.min(), brokerDefaultVersionRange.min() 
- 1]
+              (featureName, new 
FinalizedVersionRange(brokerDefaultVersionRange.min(), 
existingVersionRange.max()))
+            } else {
+              // If the existing version levels fall completely outside the
+              // range of the default finalized version levels (i.e. no 
intersection), or, if the
+              // existing version levels are ineligible for a modification 
since they are
+              // incompatible with default finalized version levels, then we 
skip the update.
+              warn(s"Can not update minimum version level in finalized 
feature: $featureName,"
+                + s" since the existing $existingVersionRange is not eligible 
for a change"
+                + s" based on the default $brokerDefaultVersionRange.")
+              (featureName, existingVersionRange)
+            }
+          }
+        }.asJava)
+      }
+      val newFeatureZNode = new FeatureZNode(FeatureZNodeStatus.Enabled, 
newFeatures)
+      if (!newFeatureZNode.equals(existingFeatureZNode)) {

Review comment:
       I see, still wondering if we could just check whether `newFeatures` is 
equal to `existingFeatureZNode.features`

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1844,185 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns a suitable error.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return         the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+    if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+      throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+    }

Review comment:
       nit: new line

##########
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "request",
+  "name": "UpdateFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+",
+      "about": "The list of updates to finalized features.", "fields": [
+      {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
+        "about": "The name of the finalized feature to be updated."},
+      {"name": "MaxVersionLevel", "type": "int16", "versions": "0+",
+        "about": "The new maximum version level for the finalized feature. A 
value >= 1 is valid. A value < 1, is special, and can be used to request the 
deletion of the finalized feature."},
+      {"name": "AllowDowngrade", "type": "bool", "versions": "0+",
+        "about": "When set to true, the finalized feature version level is 
allowed to be downgraded/deleted."}

Review comment:
       Should we also mention that this flag would fail the request when we are 
not actually doing a downgrade? 

##########
File path: 
clients/src/main/java/org/apache/kafka/common/errors/FeatureUpdateFailedException.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class FeatureUpdateFailedException extends ApiException {

Review comment:
       Do we need to make this a public error? It seems only be used 
internally, so could be made private if we don't have intention to let user 
catch.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -266,6 +275,178 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+    info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+    zkClient.createFeatureZNode(newNode)
+    val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+    newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+    info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+    zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (String) and a range of 
versions (defined by a
+   * {@link SupportedVersionRange}). It refers to a feature that a particular 
broker advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (String) and a range of 
version levels (defined
+   * by a {@link FinalizedVersionRange}). Whenever the feature versioning 
system (KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *    A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *    setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *    the possible supported features finalized immediately. Assuming this 
is the case, the
+   *    controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *    it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *    default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *    Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *    broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *    system (KIP-584). This means the user is upgrading from an earlier 
version of the broker
+   *    binary. In this case, we want to start with no finalized features and 
allow the user to
+   *    finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *    to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *    features. This process ensures we do not enable all the possible 
features immediately after
+   *    an upgrade, which could be harmful to Kafka.
+   *    This is how we handle such a case:
+   *      - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *        controller will start up and check if the FeatureZNode is absent. 
If absent, it will
+   *        react by creating a FeatureZNode with disabled status and empty 
finalized features.
+   *        Otherwise, if a node already exists in enabled status then the 
controller will just
+   *        flip the status to disabled and clear the finalized features.
+   *      - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *        KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *        and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *        Instead it will just switch the FeatureZNode status to enabled 
status. This lets the
+   *        user finalize the features later.
+   *
+   * 3. Broker binary upgraded, with existing cluster IBP config >= 
KAFKA_2_7_IV0:
+   *    Imagine an existing Kafka cluster with IBP config >= KAFKA_2_7_IV0, 
and the broker binary
+   *    has just been upgraded to a newer version (that supports IBP config 
KAFKA_2_7_IV0 and higher).
+   *    The controller will start up and find that a FeatureZNode is already 
present with enabled
+   *    status and existing finalized features. In such a case, the controller 
needs to scan the
+   *    existing finalized features and mutate them for the purpose of version 
level deprecation
+   *    (if needed).
+   *    This is how we handle this case: If an existing finalized feature is 
present in the default
+   *    finalized features, then, its existing minimum version level is 
updated to the default
+   *    minimum version level maintained in the BrokerFeatures object. The 
goal of this mutation is
+   *    to permanently deprecate one or more feature version levels. The range 
of feature version
+   *    levels deprecated are from the closed range: 
[existing_min_version_level, default_min_version_level].
+   *    NOTE: Deprecating a feature version level is an incompatible change, 
which requires a major
+   *    release of Kafka. In such a release, the minimum version level 
maintained within the
+   *    BrokerFeatures class is updated suitably to record the deprecation of 
the feature.
+   *
+   * 4. Broker downgrade:
+   *    Imagine that a Kafka cluster exists already and the IBP config is 
greater than or equal to
+   *    KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by 
setting IBP config to a
+   *    value less than KAFKA_2_7_IV0. This means the user is also disabling 
the feature versioning
+   *    system (KIP-584). In this case, when the controller starts up with the 
lower IBP config, it
+   *    will switch the FeatureZNode status to disabled with empty features.
+   */
+  private def enableFeatureVersioning(): Unit = {
+    val defaultFinalizedFeatures = brokerFeatures.getDefaultFinalizedFeatures
+    val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(FeatureZNode.path)
+    if (version == ZkVersion.UnknownVersion) {
+      val newVersion = createFeatureZNode(new 
FeatureZNode(FeatureZNodeStatus.Enabled, defaultFinalizedFeatures))
+      featureCache.waitUntilEpochOrThrow(newVersion, 
config.zkConnectionTimeoutMs)
+    } else {
+      val existingFeatureZNode = 
FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+      var newFeatures: Features[FinalizedVersionRange] = 
Features.emptyFinalizedFeatures()
+      if (existingFeatureZNode.status.equals(FeatureZNodeStatus.Enabled)) {
+        newFeatures = 
Features.finalizedFeatures(existingFeatureZNode.features.features().asScala.map 
{
+          case (featureName, existingVersionRange) =>
+            val brokerDefaultVersionRange = 
defaultFinalizedFeatures.get(featureName)
+            if (brokerDefaultVersionRange == null) {
+              warn(s"Existing finalized feature: $featureName with 
$existingVersionRange"
+                + s" is absent in default finalized $defaultFinalizedFeatures")
+              (featureName, existingVersionRange)
+            } else if (brokerDefaultVersionRange.max() >= 
existingVersionRange.max() &&
+                       brokerDefaultVersionRange.min() <= 
existingVersionRange.max()) {
+              // Through this change, we deprecate all version levels in the 
closed range:
+              // [existingVersionRange.min(), brokerDefaultVersionRange.min() 
- 1]
+              (featureName, new 
FinalizedVersionRange(brokerDefaultVersionRange.min(), 
existingVersionRange.max()))
+            } else {
+              // If the existing version levels fall completely outside the
+              // range of the default finalized version levels (i.e. no 
intersection), or, if the
+              // existing version levels are ineligible for a modification 
since they are
+              // incompatible with default finalized version levels, then we 
skip the update.
+              warn(s"Can not update minimum version level in finalized 
feature: $featureName,"
+                + s" since the existing $existingVersionRange is not eligible 
for a change"
+                + s" based on the default $brokerDefaultVersionRange.")
+              (featureName, existingVersionRange)
+            }
+        }.asJava)
+      }
+      val newFeatureZNode = new FeatureZNode(FeatureZNodeStatus.Enabled, 
newFeatures)
+      if (!newFeatureZNode.equals(existingFeatureZNode)) {
+        val newVersion = updateFeatureZNode(newFeatureZNode)
+        featureCache.waitUntilEpochOrThrow(newVersion, 
config.zkConnectionTimeoutMs)
+      }
+    }
+  }
+
+  /**
+   * Disables the feature versioning system (KIP-584).
+   *
+   * Sets up the FeatureZNode with disabled status. This status means the 
feature versioning system
+   * (KIP-584) is disabled, and, the finalized features stored in the 
FeatureZNode are not relevant.
+   * This status should be written by the controller to the FeatureZNode only 
when the broker
+   * IBP config is less than KAFKA_2_7_IV0.
+   *
+   * NOTE:
+   * 1. When this method returns, existing finalized features (if any) will be 
cleared from the
+   *    FeatureZNode.
+   * 2. This method, unlike enableFeatureVersioning() need not wait for the 
FinalizedFeatureCache
+   *    to be updated, because, such updates to the cache (via 
FinalizedFeatureChangeListener)
+   *    are disabled when IBP config is < than KAFKA_2_7_IV0.
+   */
+  private def disableFeatureVersioning(): Unit = {
+    val newNode = FeatureZNode(FeatureZNodeStatus.Disabled, 
Features.emptyFinalizedFeatures())
+    val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(FeatureZNode.path)
+    if (version == ZkVersion.UnknownVersion) {
+      createFeatureZNode(newNode)

Review comment:
       Do we need to call `featureCache.waitUntilEpochOrThrow(newNode, 
config.zkConnectionTimeoutMs)` here to ensure the update is successful?

##########
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##########
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the following:
+ *
+ * 1. The latest features supported by the Broker.
+ *
+ * 2. The default minimum version levels for specific features. This map 
enables feature
+ *    version level deprecation. This is how it works: in order to deprecate 
feature version levels,
+ *    in this map the default minimum version level of a feature can be set to 
a new value that's
+ *    higher than 1 (let's call this latest_min_version_level). In doing so, 
the feature version levels
+ *    in the closed range: [1, latest_min_version_level - 1] get deprecated by 
the controller logic
+ *    that applies this map to persistent finalized feature state in ZK (this 
mutation happens
+ *    during controller election and during finalized feature updates via the
+ *    ApiKeys.UPDATE_FINALIZED_FEATURES api). This will automatically mean 
external clients of Kafka
+ *    would need to stop using the finalized min version levels that have been 
deprecated.
+ *
+ * This class also provides APIs to check for incompatibilities between the 
features supported by
+ * the Broker and finalized features. This class is immutable in production. 
It provides few APIs to
+ * mutate state only for the purpose of testing.
+ */
+class BrokerFeatures private (@volatile var supportedFeatures: 
Features[SupportedVersionRange],
+                              @volatile var defaultFeatureMinVersionLevels: 
Map[String, Short]) {
+  require(BrokerFeatures.areFeatureMinVersionLevelsCompatible(
+    supportedFeatures, defaultFeatureMinVersionLevels))
+
+  // For testing only.
+  def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit 
= {
+    require(
+      BrokerFeatures.areFeatureMinVersionLevelsCompatible(newFeatures, 
defaultFeatureMinVersionLevels))
+    supportedFeatures = newFeatures
+  }
+
+  /**
+   * Returns the default minimum version level for a specific feature.
+   *
+   * @param feature   the name of the feature
+   *
+   * @return          the default minimum version level for the feature if its 
defined.
+   *                  otherwise, returns 1.
+   */
+  def defaultMinVersionLevel(feature: String): Short = {
+    defaultFeatureMinVersionLevels.getOrElse(feature, 1)
+  }
+
+  // For testing only.
+  def setDefaultMinVersionLevels(newMinVersionLevels: Map[String, Short]): 
Unit = {
+    require(
+      BrokerFeatures.areFeatureMinVersionLevelsCompatible(supportedFeatures, 
newMinVersionLevels))
+    defaultFeatureMinVersionLevels = newMinVersionLevels
+  }
+
+  /**
+   * Returns the default finalized features that a new Kafka cluster with IBP 
config >= KAFKA_2_7_IV0
+   * needs to be bootstrapped with.
+   */
+  def getDefaultFinalizedFeatures: Features[FinalizedVersionRange] = {
+    Features.finalizedFeatures(
+      supportedFeatures.features.asScala.map {
+        case(name, versionRange) => (
+          name, new FinalizedVersionRange(defaultMinVersionLevel(name), 
versionRange.max))
+      }.asJava)
+  }
+
+  /**
+   * Returns the set of feature names found to be incompatible.
+   * A feature incompatibility is a version mismatch between the latest 
feature supported by the
+   * Broker, and the provided finalized feature. This can happen because a 
provided finalized
+   * feature:
+   *  1) Does not exist in the Broker (i.e. it is unknown to the Broker).
+   *           [OR]
+   *  2) Exists but the FinalizedVersionRange does not match with the
+   *     supported feature's SupportedVersionRange.
+   *
+   * @param finalized   The finalized features against which incompatibilities 
need to be checked for.
+   *
+   * @return            The subset of input features which are incompatible. 
If the returned object
+   *                    is empty, it means there were no feature 
incompatibilities found.
+   */
+  def incompatibleFeatures(finalized: Features[FinalizedVersionRange]): 
Features[FinalizedVersionRange] = {
+    BrokerFeatures.incompatibleFeatures(supportedFeatures, finalized, 
logIncompatibilities = true)
+  }
+}
+
+object BrokerFeatures extends Logging {
+
+  def createDefault(): BrokerFeatures = {
+    // The arguments are currently empty, but, in the future as we define 
features we should
+    // populate the required values here.
+    new BrokerFeatures(emptySupportedFeatures, Map[String, Short]())
+  }
+
+  /**
+   * Returns true if any of the provided finalized features are incompatible 
with the provided
+   * supported features.
+   *
+   * @param supportedFeatures   The supported features to be compared
+   * @param finalizedFeatures   The finalized features to be compared
+   *
+   * @return                    - True if there are any feature 
incompatibilities found.
+   *                            - False otherwise.
+   */
+  def hasIncompatibleFeatures(supportedFeatures: 
Features[SupportedVersionRange],
+                              finalizedFeatures: 
Features[FinalizedVersionRange]): Boolean = {
+    !incompatibleFeatures(supportedFeatures, finalizedFeatures, false).empty
+  }
+
+  private def incompatibleFeatures(supportedFeatures: 
Features[SupportedVersionRange],
+                                   finalizedFeatures: 
Features[FinalizedVersionRange],
+                                   logIncompatibilities: Boolean): 
Features[FinalizedVersionRange] = {
+    val incompatibleFeaturesInfo = finalizedFeatures.features.asScala.map {
+      case (feature, versionLevels) =>
+        val supportedVersions = supportedFeatures.get(feature)
+        if (supportedVersions == null) {
+          (feature, versionLevels, "{feature=%s, reason='Unsupported 
feature'}".format(feature))
+        } else if (versionLevels.isIncompatibleWith(supportedVersions)) {
+          (feature, versionLevels, "{feature=%s, reason='%s is incompatible 
with %s'}".format(
+            feature, versionLevels, supportedVersions))
+        } else {
+          (feature, versionLevels, null)
+        }
+    }.filter{ case(_, _, errorReason) => errorReason != null}.toList
+
+    if (logIncompatibilities && incompatibleFeaturesInfo.nonEmpty) {
+      warn(
+        "Feature incompatibilities seen: " + incompatibleFeaturesInfo.map {
+          case(_, _, errorReason) => errorReason })
+    }
+    Features.finalizedFeatures(incompatibleFeaturesInfo.map {
+      case(feature, versionLevels, _) => (feature, versionLevels) 
}.toMap.asJava)
+  }
+
+  /**
+   * A check that ensures each feature defined with min version level is a 
supported feature, and
+   * the min version level value is valid (i.e. it is compatible with the 
supported version range).
+   *
+   * @param supportedFeatures         the supported features
+   * @param featureMinVersionLevels   the feature minimum version levels
+   *
+   * @return                          - true, if the above described check 
passes.
+   *                                  - false, otherwise.
+   */
+  private def areFeatureMinVersionLevelsCompatible(
+    supportedFeatures: Features[SupportedVersionRange],
+    featureMinVersionLevels: Map[String, Short]
+  ): Boolean = {
+    featureMinVersionLevels.forall {
+      case(featureName, minVersionLevel) =>
+        val supportedFeature = supportedFeatures.get(featureName)
+        (supportedFeature != null) &&
+          !new FinalizedVersionRange(minVersionLevel, supportedFeature.max())

Review comment:
       Could we get a static method instead of initiating a new 
`FinalizedVersionRange` for a comparison every time?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1844,188 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns a suitable error.

Review comment:
       State the error explicitly here.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1844,188 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns a suitable error.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return         the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+    if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+      throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+    }
+    // NOTE: Below we set the finalized min version level to be the default 
minimum version
+    // level. If the finalized feature already exists, then, this can cause 
deprecation of all
+    // version levels in the closed range:
+    // [existingVersionRange.min(), defaultMinVersionLevel - 1].
+    val defaultMinVersionLevel = 
brokerFeatures.defaultMinVersionLevel(update.feature)
+    val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, 
update.maxVersionLevel)
+    val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+      val singleFinalizedFeature =
+        Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+      BrokerFeatures.hasIncompatibleFeatures(broker.features, 
singleFinalizedFeature)
+    })
+    if (numIncompatibleBrokers == 0) {
+      Left(newVersionRange)
+    } else {
+      Right(
+        new ApiError(Errors.INVALID_REQUEST,
+                     s"Could not apply finalized feature update because 
$numIncompatibleBrokers" +
+                     " brokers were found to have incompatible features."))
+    }
+  }
+
+  /**
+   * Validate and process a finalized feature update.
+   *
+   * If the processing is successful, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the processing failed, then returned value contains a suitable 
ApiError.
+   *
+   * @param update   the feature update to be processed.
+   *
+   * @return         the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def processFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+    val existingFeatures = featureCache.get
+      .map(featuresAndEpoch => featuresAndEpoch.features.features().asScala)
+      .getOrElse(Map[String, FinalizedVersionRange]())
+
+    def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+      newFinalizedVersionRangeOrIncompatibilityError(update)
+        .fold(versionRange => Left(Some(versionRange)), error => Right(error))
+    }
+
+    if (update.feature.isEmpty) {
+      // Check that the feature name is not empty.
+      Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+    } else {
+      val cacheEntry = existingFeatures.get(update.feature).orNull
+
+      // We handle deletion requests separately from non-deletion requests.
+      if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+        if (cacheEntry == null) {
+          // Disallow deletion of a non-existing finalized feature.
+          Right(new ApiError(Errors.INVALID_REQUEST,
+                             s"Can not delete non-existing finalized feature: 
'${update.feature}'"))
+        } else {
+          Left(Option.empty)
+        }
+      } else if (update.maxVersionLevel() < 1) {
+        // Disallow deletion of a finalized feature without allowDowngrade 
flag set.
+        Right(new ApiError(Errors.INVALID_REQUEST,
+                           s"Can not provide maxVersionLevel: 
${update.maxVersionLevel} less" +
+                           s" than 1 for feature: '${update.feature}' without 
setting the" +
+                           " allowDowngrade flag to true in the request."))
+      } else {
+        if (cacheEntry == null) {
+          newVersionRangeOrError(update)
+        } else {
+          if (update.maxVersionLevel == cacheEntry.max()) {
+            // Disallow a case where target maxVersionLevel matches existing 
maxVersionLevel.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+                               s"Can not ${if (update.allowDowngrade) 
"downgrade" else "upgrade"}" +
+                               s" a finalized feature: '${update.feature}' 
from existing" +
+                               s" maxVersionLevel:${cacheEntry.max} to the 
same value."))
+          } else if (update.maxVersionLevel < cacheEntry.max && 
!update.allowDowngrade) {
+            // Disallow downgrade of a finalized feature without the 
allowDowngrade flag set.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+                               s"Can not downgrade finalized feature: 
'${update.feature}' from" +
+                               s" existing maxVersionLevel:${cacheEntry.max} 
to provided" +
+                               s" maxVersionLevel:${update.maxVersionLevel} 
without setting the" +
+                               " allowDowngrade flag in the request."))
+          } else if (update.allowDowngrade && update.maxVersionLevel > 
cacheEntry.max) {

Review comment:
       I'm actually wondering whether this is too strict in the perspective of 
a user. If they accidentally set a feature version larger than the cache, what 
they only care about is to be able to change the version to it. So it's a 
matter of whether we think this is a user error, or this could happen when user 
gets stale feature information from a broker while the downgrade already 
succeed eventually. 
   
   If we want to keep this check, it makes sense to update the meta comments 
around `allowDowngrade` to inform user that the request could fail when the 
target version is actually higher than the current finalized feature.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1844,185 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns a suitable error.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return         the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+    if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+      throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+    }
+    // NOTE: Below we set the finalized min version level to be the default 
minimum version
+    // level. If the finalized feature already exists, then, this can cause 
deprecation of all
+    // version levels in the closed range:
+    // [existingVersionRange.min(), defaultMinVersionLevel - 1].
+    val defaultMinVersionLevel = 
brokerFeatures.defaultMinVersionLevel(update.feature)
+    val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, 
update.maxVersionLevel)
+    val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+      val singleFinalizedFeature =
+        Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+      BrokerFeatures.hasIncompatibleFeatures(broker.features, 
singleFinalizedFeature)
+    })
+    if (numIncompatibleBrokers == 0) {
+      Left(newVersionRange)
+    } else {
+      Right(
+        new ApiError(Errors.INVALID_REQUEST,
+                     s"Could not apply finalized feature update because 
$numIncompatibleBrokers" +
+                     " brokers were found to have incompatible features."))
+    }
+  }
+
+  /**
+   * Validate and process a finalized feature update on an existing 
FinalizedVersionRange for the
+   * feature.
+   *
+   * If the processing is successful, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the processing failed, then returned value contains a suitable 
ApiError.
+   *
+   * @param update                 the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *                               FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return                       the new FinalizedVersionRange or error, as 
described above.
+   */
+  private def processFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+                                   existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {
+    def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+      newFinalizedVersionRangeOrIncompatibilityError(update)
+        .fold(versionRange => Left(Some(versionRange)), error => Right(error))
+    }
+
+    if (update.feature.isEmpty) {
+      // Check that the feature name is not empty.
+      Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+    } else {
+      // We handle deletion requests separately from non-deletion requests.
+      if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+        if (existingVersionRange.isEmpty) {
+          // Disallow deletion of a non-existing finalized feature.
+          Right(new ApiError(Errors.INVALID_REQUEST,
+                             s"Can not delete non-existing finalized feature: 
'${update.feature}'"))
+        } else {
+          Left(Option.empty)
+        }
+      } else if (update.maxVersionLevel() < 1) {

Review comment:
       Is this case covered by the case on L1931? Could we merge both?

##########
File path: core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala
##########
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.junit.Assert.{assertEquals, assertThrows, assertTrue}
+import org.junit.Test
+
+import scala.jdk.CollectionConverters._
+
+class BrokerFeaturesTest {

Review comment:
       Some methods in the `BrokerFeatures` are not covered by this suite, such 
as `defaultMinVersionLevel`, `getDefaultFinalizedFeatures` and 
`hasIncompatibleFeatures`, you could use code coverage tool to figure out any 
missing part.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1395,7 +1596,7 @@ class KafkaController(val config: KafkaConfig,
     if (newMetadataOpt.nonEmpty && oldMetadataOpt.nonEmpty) {
       val oldMetadata = oldMetadataOpt.get
       val newMetadata = newMetadataOpt.get
-      if (newMetadata.endPoints != oldMetadata.endPoints) {
+      if (newMetadata.endPoints != oldMetadata.endPoints || 
!oldMetadata.features.equals(newMetadata.features)) {

Review comment:
       I see, still I'm a bit worried future changes could break this 
assumption. Not a bad idea to check `features != null`?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1844,185 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns a suitable error.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return         the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+    if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+      throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+    }
+    // NOTE: Below we set the finalized min version level to be the default 
minimum version
+    // level. If the finalized feature already exists, then, this can cause 
deprecation of all
+    // version levels in the closed range:
+    // [existingVersionRange.min(), defaultMinVersionLevel - 1].
+    val defaultMinVersionLevel = 
brokerFeatures.defaultMinVersionLevel(update.feature)
+    val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, 
update.maxVersionLevel)
+    val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+      val singleFinalizedFeature =
+        Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+      BrokerFeatures.hasIncompatibleFeatures(broker.features, 
singleFinalizedFeature)
+    })
+    if (numIncompatibleBrokers == 0) {
+      Left(newVersionRange)
+    } else {
+      Right(
+        new ApiError(Errors.INVALID_REQUEST,
+                     s"Could not apply finalized feature update because 
$numIncompatibleBrokers" +
+                     " brokers were found to have incompatible features."))
+    }
+  }
+
+  /**
+   * Validate and process a finalized feature update on an existing 
FinalizedVersionRange for the
+   * feature.
+   *
+   * If the processing is successful, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the processing failed, then returned value contains a suitable 
ApiError.
+   *
+   * @param update                 the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *                               FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return                       the new FinalizedVersionRange or error, as 
described above.
+   */
+  private def processFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+                                   existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {
+    def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+      newFinalizedVersionRangeOrIncompatibilityError(update)
+        .fold(versionRange => Left(Some(versionRange)), error => Right(error))
+    }
+
+    if (update.feature.isEmpty) {
+      // Check that the feature name is not empty.
+      Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+    } else {
+      // We handle deletion requests separately from non-deletion requests.
+      if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+        if (existingVersionRange.isEmpty) {
+          // Disallow deletion of a non-existing finalized feature.
+          Right(new ApiError(Errors.INVALID_REQUEST,
+                             s"Can not delete non-existing finalized feature: 
'${update.feature}'"))
+        } else {
+          Left(Option.empty)
+        }
+      } else if (update.maxVersionLevel() < 1) {
+        // Disallow deletion of a finalized feature without allowDowngrade 
flag set.
+        Right(new ApiError(Errors.INVALID_REQUEST,
+                           s"Can not provide maxVersionLevel: 
${update.maxVersionLevel} less" +
+                           s" than 1 for feature: '${update.feature}' without 
setting the" +
+                           " allowDowngrade flag to true in the request."))
+      } else {
+        existingVersionRange.map(existing =>
+          if (update.maxVersionLevel == existing.max) {
+            // Disallow a case where target maxVersionLevel matches existing 
maxVersionLevel.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+              s"Can not ${if (update.allowDowngrade) "downgrade" else 
"upgrade"}" +
+                s" a finalized feature: '${update.feature}' from existing" +
+                s" maxVersionLevel:${existing.max} to the same value."))
+          } else if (update.maxVersionLevel < existing.max && 
!update.allowDowngrade) {
+            // Disallow downgrade of a finalized feature without the 
allowDowngrade flag set.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+              s"Can not downgrade finalized feature: '${update.feature}' from" 
+
+                s" existing maxVersionLevel:${existing.max} to provided" +
+                s" maxVersionLevel:${update.maxVersionLevel} without setting 
the" +
+                " allowDowngrade flag in the request."))
+          } else if (update.allowDowngrade && update.maxVersionLevel > 
existing.max) {
+            // Disallow a request that sets allowDowngrade flag without 
specifying a
+            // maxVersionLevel that's lower than the existing maxVersionLevel.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+              s"When finalized feature: '${update.feature}' has the 
allowDowngrade" +
+                " flag set in the request, the provided" +
+                s" maxVersionLevel:${update.maxVersionLevel} can not be 
greater than" +
+                s" existing maxVersionLevel:${existing.max}."))
+          } else if (update.maxVersionLevel() < existing.min) {
+            // Disallow downgrade of a finalized feature below the existing 
finalized
+            // minVersionLevel.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+              s"Can not downgrade finalized feature: '${update.feature}' to" +
+                s" maxVersionLevel:${update.maxVersionLevel} because it's 
lower than" +
+                s" the existing minVersionLevel:${existing.min}."))
+          } else {
+            newVersionRangeOrError(update)
+          }
+        ).getOrElse(newVersionRangeOrError(update))
+      }
+    }
+  }
+
+  private def processFeatureUpdates(request: UpdateFeaturesRequest,
+                                    callback: UpdateFeaturesCallback): Unit = {
+    if (isActive) {
+      processFeatureUpdatesWithActiveController(request, callback)
+    } else {
+      val results = request.data().featureUpdates().asScala.map {
+        update => update.feature() -> new ApiError(Errors.NOT_CONTROLLER)
+      }.toMap
+      callback(results)
+    }
+  }
+
+  private def processFeatureUpdatesWithActiveController(request: 
UpdateFeaturesRequest,
+                                                        callback: 
UpdateFeaturesCallback): Unit = {
+    val updates = request.data.featureUpdates
+    val existingFeatures = featureCache.get
+      .map(featuresAndEpoch => featuresAndEpoch.features.features().asScala)
+      .getOrElse(Map[String, FinalizedVersionRange]())
+    // Map of feature to FinalizedVersionRange. This contains the target 
features to be eventually
+    // written to FeatureZNode.
+    val targetFeatures = scala.collection.mutable.Map[String, 
FinalizedVersionRange]() ++ existingFeatures
+    // Map of feature to error.
+    var errors = scala.collection.mutable.Map[String, ApiError]()
+
+    // Process each FeatureUpdate.
+    // If a FeatureUpdate is found to be valid, then the corresponding entry 
in errors would contain
+    // Errors.NONE. Otherwise the entry would contain the appropriate error.
+    updates.asScala.iterator.foreach { update =>
+      processFeatureUpdate(update, existingFeatures.get(update.feature())) 
match {
+        case Left(newVersionRangeOrNone) =>
+          newVersionRangeOrNone
+            .map(newVersionRange => targetFeatures += (update.feature() -> 
newVersionRange))
+            .getOrElse(targetFeatures -= update.feature())
+          errors += (update.feature() -> new ApiError(Errors.NONE))
+        case Right(featureUpdateFailureReason) =>
+          errors += (update.feature() -> featureUpdateFailureReason)
+      }
+    }
+
+    if (existingFeatures.equals(targetFeatures)) {

Review comment:
       Could you clarify the reasoning here? If structs are not the same, are 
we going to do a partial update?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1844,185 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns a suitable error.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return         the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+    if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+      throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+    }
+    // NOTE: Below we set the finalized min version level to be the default 
minimum version
+    // level. If the finalized feature already exists, then, this can cause 
deprecation of all
+    // version levels in the closed range:
+    // [existingVersionRange.min(), defaultMinVersionLevel - 1].
+    val defaultMinVersionLevel = 
brokerFeatures.defaultMinVersionLevel(update.feature)
+    val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, 
update.maxVersionLevel)
+    val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+      val singleFinalizedFeature =
+        Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+      BrokerFeatures.hasIncompatibleFeatures(broker.features, 
singleFinalizedFeature)
+    })
+    if (numIncompatibleBrokers == 0) {
+      Left(newVersionRange)
+    } else {
+      Right(
+        new ApiError(Errors.INVALID_REQUEST,
+                     s"Could not apply finalized feature update because 
$numIncompatibleBrokers" +
+                     " brokers were found to have incompatible features."))
+    }
+  }
+
+  /**
+   * Validate and process a finalized feature update on an existing 
FinalizedVersionRange for the
+   * feature.
+   *
+   * If the processing is successful, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the processing failed, then returned value contains a suitable 
ApiError.
+   *
+   * @param update                 the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *                               FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return                       the new FinalizedVersionRange or error, as 
described above.
+   */
+  private def processFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+                                   existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {
+    def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+      newFinalizedVersionRangeOrIncompatibilityError(update)
+        .fold(versionRange => Left(Some(versionRange)), error => Right(error))
+    }
+
+    if (update.feature.isEmpty) {
+      // Check that the feature name is not empty.
+      Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+    } else {
+      // We handle deletion requests separately from non-deletion requests.
+      if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+        if (existingVersionRange.isEmpty) {
+          // Disallow deletion of a non-existing finalized feature.
+          Right(new ApiError(Errors.INVALID_REQUEST,
+                             s"Can not delete non-existing finalized feature: 
'${update.feature}'"))
+        } else {
+          Left(Option.empty)
+        }
+      } else if (update.maxVersionLevel() < 1) {
+        // Disallow deletion of a finalized feature without allowDowngrade 
flag set.
+        Right(new ApiError(Errors.INVALID_REQUEST,
+                           s"Can not provide maxVersionLevel: 
${update.maxVersionLevel} less" +
+                           s" than 1 for feature: '${update.feature}' without 
setting the" +
+                           " allowDowngrade flag to true in the request."))
+      } else {
+        existingVersionRange.map(existing =>
+          if (update.maxVersionLevel == existing.max) {
+            // Disallow a case where target maxVersionLevel matches existing 
maxVersionLevel.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+              s"Can not ${if (update.allowDowngrade) "downgrade" else 
"upgrade"}" +
+                s" a finalized feature: '${update.feature}' from existing" +
+                s" maxVersionLevel:${existing.max} to the same value."))
+          } else if (update.maxVersionLevel < existing.max && 
!update.allowDowngrade) {
+            // Disallow downgrade of a finalized feature without the 
allowDowngrade flag set.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+              s"Can not downgrade finalized feature: '${update.feature}' from" 
+
+                s" existing maxVersionLevel:${existing.max} to provided" +
+                s" maxVersionLevel:${update.maxVersionLevel} without setting 
the" +
+                " allowDowngrade flag in the request."))
+          } else if (update.allowDowngrade && update.maxVersionLevel > 
existing.max) {
+            // Disallow a request that sets allowDowngrade flag without 
specifying a
+            // maxVersionLevel that's lower than the existing maxVersionLevel.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+              s"When finalized feature: '${update.feature}' has the 
allowDowngrade" +
+                " flag set in the request, the provided" +
+                s" maxVersionLevel:${update.maxVersionLevel} can not be 
greater than" +
+                s" existing maxVersionLevel:${existing.max}."))
+          } else if (update.maxVersionLevel() < existing.min) {

Review comment:
       We should be consistent and remove `()` from `maxVersionLevel`

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -39,7 +42,7 @@ case class FinalizedFeaturesAndEpoch(features: 
Features[FinalizedVersionRange],
  *
  * @see FinalizedFeatureChangeListener
  */
-object FinalizedFeatureCache extends Logging {
+class FinalizedFeatureCache(private val brokerFeatures: BrokerFeatures) 
extends Logging {

Review comment:
       The meta comment for `FinalizedFeatureCache` should be updated as it is 
now being accessed for both read and write

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2945,6 +2948,130 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleUpdateFeatures(request: RequestChannel.Request): Unit = {

Review comment:
       Seems not covered yet

##########
File path: 
core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
##########
@@ -78,25 +76,42 @@ class FinalizedFeatureChangeListenerTest extends 
ZooKeeperTestHarness {
   /**
    * Tests that the listener can be initialized, and that it can listen to ZK 
notifications
    * successfully from an "Enabled" FeatureZNode (the ZK data has no feature 
incompatibilities).
+   * Particularly the test checks if multiple notifications can be processed 
in ZK
+   * (i.e. whether the FeatureZNode watch can be re-established).
    */
   @Test
   def testInitSuccessAndNotificationSuccess(): Unit = {
-    createSupportedFeatures()
     val initialFinalizedFeatures = createFinalizedFeatures()
-    val listener = createListener(Some(initialFinalizedFeatures))
+    val brokerFeatures = createBrokerFeatures()
+    val cache = new FinalizedFeatureCache(brokerFeatures)
+    val listener = createListener(cache, Some(initialFinalizedFeatures))
 
-    val updatedFinalizedFeaturesMap = Map[String, FinalizedVersionRange](
-      "feature_1" -> new FinalizedVersionRange(2, 4))
-    val updatedFinalizedFeatures = 
Features.finalizedFeatures(updatedFinalizedFeaturesMap.asJava)
-    zkClient.updateFeatureZNode(FeatureZNode(FeatureZNodeStatus.Enabled, 
updatedFinalizedFeatures))
-    val (mayBeFeatureZNodeNewBytes, updatedVersion) = 
zkClient.getDataAndVersion(FeatureZNode.path)
-    assertNotEquals(updatedVersion, ZkVersion.UnknownVersion)
-    assertFalse(mayBeFeatureZNodeNewBytes.isEmpty)
-    assertTrue(updatedVersion > initialFinalizedFeatures.epoch)
-    TestUtils.waitUntilTrue(() => {
-      
FinalizedFeatureCache.get.get.equals(FinalizedFeaturesAndEpoch(updatedFinalizedFeatures,
 updatedVersion))
-    }, "Timed out waiting for FinalizedFeatureCache to be updated with new 
features")
-    assertTrue(listener.isListenerInitiated)
+    def updateAndCheckCache(finalizedFeatures: 
Features[FinalizedVersionRange]): Unit = {
+      zkClient.updateFeatureZNode(FeatureZNode(FeatureZNodeStatus.Enabled, 
finalizedFeatures))
+      val (mayBeFeatureZNodeNewBytes, updatedVersion) = 
zkClient.getDataAndVersion(FeatureZNode.path)
+      assertNotEquals(updatedVersion, ZkVersion.UnknownVersion)
+      assertFalse(mayBeFeatureZNodeNewBytes.isEmpty)
+      assertTrue(updatedVersion > initialFinalizedFeatures.epoch)
+
+      cache.waitUntilEpochOrThrow(updatedVersion, 
JTestUtils.DEFAULT_MAX_WAIT_MS)
+      assertEquals(FinalizedFeaturesAndEpoch(finalizedFeatures, 
updatedVersion), cache.get.get)
+      assertTrue(listener.isListenerInitiated)
+    }
+
+    // Check if the write succeeds and a ZK notification is received that 
causes the feature cache
+    // to be populated.
+    updateAndCheckCache(
+      Features.finalizedFeatures(
+        Map[String, FinalizedVersionRange](
+      "feature_1" -> new FinalizedVersionRange(2, 4)).asJava))

Review comment:
       Indentation is not right.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -109,7 +109,9 @@ class KafkaApis(val requestChannel: RequestChannel,
                 brokerTopicStats: BrokerTopicStats,
                 val clusterId: String,
                 time: Time,
-                val tokenManager: DelegationTokenManager) extends Logging {
+                val tokenManager: DelegationTokenManager,
+                val brokerFeatures: BrokerFeatures,

Review comment:
       Could we only pass in `featureCache` to reduce the class coupling here? 
As we already have `brokerFeatures` as a private parameter, it shouldn't be too 
hard to set a helper to get supported features.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to