abbccdda commented on a change in pull request #8680:
URL: https://github.com/apache/kafka/pull/8680#discussion_r429476928



##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -47,7 +56,30 @@ class FinalizedFeatureChangeListener(zkClient: 
KafkaZkClient) extends Logging {
       })
 
       info(s"Reading feature ZK node at path: $featureZkNodePath")
-      val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(featureZkNodePath)
+      var mayBeFeatureZNodeBytes: Option[Array[Byte]] = null
+      var version: Int = ZkVersion.UnknownVersion
+      try {
+        val result = zkClient.getDataAndVersion(featureZkNodePath)
+        mayBeFeatureZNodeBytes = result._1
+        version = result._2
+      } catch {
+        // Convert to RuntimeException, to avoid a confusion that there is no 
argument passed
+        // to the updateOrThrow() method.
+        case e: IllegalArgumentException => throw new RuntimeException(e)

Review comment:
       I didn't look thoroughly enough, but the only IllegalArgumentException I 
found is 
   ```
    case invalidVersion =>
           throw new IllegalArgumentException(s"Expected controller epoch 
zkVersion $invalidVersion should be non-negative or equal to 
${ZkVersion.MatchAnyVersion}")
   ```
   which should never happen as we always use `MatchAnyVersion` in 
`retryRequestsUntilConnected`. Are we trying to catch some other exceptions 
here?

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -37,6 +37,15 @@ class FinalizedFeatureChangeListener(zkClient: 
KafkaZkClient) extends Logging {
      *
      * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked
      * only exactly once successfully.
+     *
+     * @throws   IllegalStateException, if a non-empty notifier was provided 
in the constructor, and
+     *           this method is called again after a successful previous 
invocation.
+     *
+     *           FeatureCacheUpdateException, if there was an error in 
updating the

Review comment:
       We could have multiple @throws here

##########
File path: 
clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
+public class FeaturesTest {
+
+    @Test
+    public void testEmptyFeatures() {
+        Map<String, Map<String, Long>> emptyMap = new HashMap<>();
+
+        Features<FinalizedVersionRange> emptyFinalizedFeatures = 
Features.emptyFinalizedFeatures();
+        assertEquals(new HashMap<>(), emptyFinalizedFeatures.features());
+        assertEquals(emptyMap, emptyFinalizedFeatures.serialize());
+        assertEquals(emptyFinalizedFeatures, 
Features.deserializeFinalizedFeatures(emptyMap));
+
+        Features<SupportedVersionRange> emptySupportedFeatures = 
Features.emptySupportedFeatures();
+        assertEquals(new HashMap<>(), emptySupportedFeatures.features());

Review comment:
       nit: we could test `emptySupportedFeatures.features().isEmpty()`

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,232 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to notify 
the caller when an
+   *                            updateOrThrow() operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked
+     * only exactly once successfully.

Review comment:
       nit: remove `only`

##########
File path: core/src/main/scala/kafka/zk/ZkData.scala
##########
@@ -744,6 +782,90 @@ object DelegationTokenInfoZNode {
   def decode(bytes: Array[Byte]): Option[TokenInformation] = 
DelegationTokenManager.fromBytes(bytes)
 }
 
+object FeatureZNodeStatus extends Enumeration {
+  val Disabled, Enabled = Value
+
+  def withNameOpt(value: Int): Option[Value] = {
+    values.find(_.id == value)
+  }
+}
+
+case class FeatureZNode(status: FeatureZNodeStatus.Value, features: 
Features[FinalizedVersionRange]) {
+}
+
+object FeatureZNode {

Review comment:
       I think the norm exists because we don't have automated framework by 
then, and doing hand-written json serialization and deserialization is a bit 
wasting. cc @hachikuji @cmccabe as this is a major direction discussion.

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being <VersionRangeType>.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ * @see SupportedVersionRange
+ * @see FinalizedVersionRange
+ */
+public class Features<VersionRangeType extends BaseVersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        if (features == null) {

Review comment:
       nit: one liner: `this.features = Objects.requireNonNull(features, 
"Provided features can not be null.");`

##########
File path: 
clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Represents an immutable basic version range using 2 attributes: min and max 
of type long.
+ * The min and max attributes are expected to be >= 1, and with max >= min.
+ *
+ * The class also provides API to serialize/deserialize the version range 
to/from a map.
+ * The class allows for configurable labels for the min/max attributes, which 
can be specialized by
+ * sub-classes (if needed).
+ */
+class BaseVersionRange {
+    private final String minKeyLabel;
+
+    private final long minValue;
+
+    private final String maxKeyLabel;
+
+    private final long maxValue;
+
+    protected BaseVersionRange(String minKey, long minValue, String 
maxKeyLabel, long maxValue) {

Review comment:
       nit: minKeyLabel

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide 
notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled 
status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus 
found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes 
successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has 
already completed
+     * successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             - RuntimeException if the thread was interrupted 
during wait
+     *                     - TimeoutException if the wait can not be completed 
in waitTimeMs
+     *                       milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to 
be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that 
are populated into the
+   * queue. If any change notification can not be processed successfully 
(unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker 
exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends 
ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()
+      } catch {
+        case e: InterruptedException => info(s"Interrupted", e)
+        case e: Exception => {
+          error("Failed to process feature ZK node change event. The broker 
will exit.", e)
+          throw new FatalExitError(1)
+        }
+      }
+    }
+  }
+
+  // Feature ZK node change handler.
+  object FeatureZNodeChangeHandler extends ZNodeChangeHandler {
+    override val path: String = FeatureZNode.path
+
+    private def processNotification(): Unit = {
+      queue.add(new FeatureCacheUpdater(path))
+    }
+
+    override def handleCreation(): Unit = {
+      info(s"Feature ZK node created at path: $path")

Review comment:
       My feeling is that this could be on debug level, but no strong 
perference.

##########
File path: clients/src/main/resources/common/message/ApiVersionsResponse.json
##########
@@ -42,6 +42,33 @@
         "about": "The maximum supported version, inclusive." }
     ]},
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", 
"ignorable": true,
-      "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." }
+      "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },

Review comment:
       I see, makes sense.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,86 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange}
+
+// Raised whenever there was an error in updating the FinalizedFeatureCache 
with features.
+class FeatureCacheUpdateException(message: String) extends 
RuntimeException(message) {
+}
+
+// Helper class that represents finalized features along with an epoch value.
+case class FinalizedFeaturesAndEpoch(features: 
Features[FinalizedVersionRange], epoch: Int) {
+  override def toString(): String = {
+    "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch)
+  }
+}
+
+/**
+ * A mutable cache containing the latest finalized features and epoch. This 
cache is populated by a
+ * {@link FinalizedFeatureChangeListener}.
+ *
+ * Currently the main reader of this cache is the read path that serves an 
ApiVersionsRequest,
+ * returning the features information in the response.
+ */
+object FinalizedFeatureCache extends Logging {
+  @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = 
Option.empty
+
+  /**
+   * @return   the latest known FinalizedFeaturesAndEpoch. If the returned 
value is empty, it means
+   *           no FinalizedFeaturesAndEpoch exists in the cache at the time 
when this
+   *           method is invoked. This result could change in the future 
whenever the
+   *           updateOrThrow method is invoked.
+   */
+  def get: Option[FinalizedFeaturesAndEpoch] = {
+    featuresAndEpoch
+  }
+
+  def empty: Boolean = {
+    featuresAndEpoch.isEmpty
+  }
+
+  /**
+   * Clears all existing finalized features and epoch from the cache.
+   */
+  def clear(): Unit = {
+    featuresAndEpoch = Option.empty
+    info("Cleared cache")
+  }
+
+  /**
+   * Updates the cache to the latestFeatures, and updates the existing epoch 
to latestEpoch.
+   * Raises an exception when the operation is not successful.
+   *
+   * @param latestFeatures   the latest finalized features to be set in the 
cache
+   * @param latestEpoch      the latest epoch value to be set in the cache
+   *
+   * @throws                 FeatureCacheUpdateException if the cache update 
operation fails
+   *                         due to invalid parameters or incompatibilities 
with the broker's
+   *                         supported features. In such a case, the existing 
cache contents are
+   *                         not modified.
+   */
+  def updateOrThrow(latestFeatures: Features[FinalizedVersionRange], 
latestEpoch: Int): Unit = {
+    updateOrThrow(FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch))
+  }
+
+  private def updateOrThrow(latest: FinalizedFeaturesAndEpoch): Unit = {

Review comment:
       nit: could you elaborate why this helper function and 
`FinalizedFeaturesAndEpoch` struct is useful in this context? Just for easier 
message printing?

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,232 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to notify 
the caller when an
+   *                            updateOrThrow() operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked
+     * only exactly once successfully.
+     *
+     * @throws   IllegalStateException, if a non-empty notifier was provided 
in the constructor, and
+     *           this method is called again after a successful previous 
invocation.
+     *
+     *           FeatureCacheUpdateException, if there was an error in 
updating the
+     *           FinalizedFeatureCache.
+     *
+     *           RuntimeException, if there was a failure in 
reading/deserializing the
+     *           contents of the feature ZK node.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      var mayBeFeatureZNodeBytes: Option[Array[Byte]] = null
+      var version: Int = ZkVersion.UnknownVersion
+      try {
+        val result = zkClient.getDataAndVersion(featureZkNodePath)
+        mayBeFeatureZNodeBytes = result._1
+        version = result._2
+      } catch {
+        // Convert to RuntimeException, to avoid a confusion that there is no 
argument passed
+        // to the updateOrThrow() method.
+        case e: IllegalArgumentException => throw new RuntimeException(e)
+      }
+
+      // There are 4 cases:
+      //
+      // (empty dataBytes, valid version)       => The empty dataBytes will 
fail FeatureZNode deserialization.
+      //                                           FeatureZNode, when present 
in ZK, can not have empty contents.
+      // (non-empty dataBytes, valid version)   => This is a valid case, and 
should pass FeatureZNode deserialization
+      //                                           if dataBytes contains valid 
data.
+      // (empty dataBytes, unknown version)     => This is a valid case, and 
this can happen if the FeatureZNode
+      //                                           does not exist in ZK.
+      // (non-empty dataBytes, unknown version) => This case is impossible, 
since, KafkaZkClient.getDataAndVersion
+      //                                           API ensures that unknown 
version is returned only when the
+      //                                           ZK node is absent. 
Therefore dataBytes should be empty in such
+      //                                           a case.
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled 
status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus 
found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes 
successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has 
already completed
+     * successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             RuntimeException if the thread was interrupted 
during wait
+     *
+     *                     TimeoutException if the wait can not be completed 
in waitTimeMs
+     *                     milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to 
be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that 
are populated into the
+   * queue. If any change notification can not be processed successfully 
(unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker 
exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends 
ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()
+      } catch {
+        case e: InterruptedException => info(s"Change notification queue 
interrupted", e)
+        case e: Exception => {
+          error("Failed to process feature ZK node change event. The broker 
will exit.", e)
+          throw new FatalExitError(1)
+        }
+      }
+    }
+  }
+
+  // Feature ZK node change handler.
+  object FeatureZNodeChangeHandler extends ZNodeChangeHandler {
+    override val path: String = FeatureZNode.path
+
+    override def handleCreation(): Unit = {
+      info(s"Feature ZK node created at path: $path")
+      queue.add(new FeatureCacheUpdater(path))
+    }
+
+    override def handleDataChange(): Unit = {
+      info(s"Feature ZK node updated at path: $path")
+      queue.add(new FeatureCacheUpdater(path))
+    }
+
+    override def handleDeletion(): Unit = {
+      warn(s"Feature ZK node deleted at path: $path")
+      // This event may happen, rarely (ex: operational error).

Review comment:
       This leads to a more general question: is there a way to cleanup all the 
ZK feature path? Reading from the KIP, I don't see we have any admin API to do 
so, which makes me wonder how could this case happen in reality. In terms of 
severity, I think crushing the entire cluster seems to be an overkill as well, 
maybe we should have some blocking mechanism in place for any feature 
extraction call here, until we see `handleCreation` gets triggered again?

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,232 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to notify 
the caller when an
+   *                            updateOrThrow() operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked
+     * only exactly once successfully.
+     *
+     * @throws   IllegalStateException, if a non-empty notifier was provided 
in the constructor, and
+     *           this method is called again after a successful previous 
invocation.
+     *
+     *           FeatureCacheUpdateException, if there was an error in 
updating the
+     *           FinalizedFeatureCache.
+     *
+     *           RuntimeException, if there was a failure in 
reading/deserializing the
+     *           contents of the feature ZK node.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      var mayBeFeatureZNodeBytes: Option[Array[Byte]] = null
+      var version: Int = ZkVersion.UnknownVersion
+      try {
+        val result = zkClient.getDataAndVersion(featureZkNodePath)
+        mayBeFeatureZNodeBytes = result._1
+        version = result._2
+      } catch {
+        // Convert to RuntimeException, to avoid a confusion that there is no 
argument passed
+        // to the updateOrThrow() method.
+        case e: IllegalArgumentException => throw new RuntimeException(e)
+      }
+
+      // There are 4 cases:
+      //
+      // (empty dataBytes, valid version)       => The empty dataBytes will 
fail FeatureZNode deserialization.
+      //                                           FeatureZNode, when present 
in ZK, can not have empty contents.
+      // (non-empty dataBytes, valid version)   => This is a valid case, and 
should pass FeatureZNode deserialization
+      //                                           if dataBytes contains valid 
data.
+      // (empty dataBytes, unknown version)     => This is a valid case, and 
this can happen if the FeatureZNode
+      //                                           does not exist in ZK.
+      // (non-empty dataBytes, unknown version) => This case is impossible, 
since, KafkaZkClient.getDataAndVersion
+      //                                           API ensures that unknown 
version is returned only when the
+      //                                           ZK node is absent. 
Therefore dataBytes should be empty in such
+      //                                           a case.
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)

Review comment:
       So here we will directly throw NoSuchElementException if 
`mayBeFeatureZNodeBytes` is empty? Do we want to check this case and throw a 
customized exception instead?

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,232 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to notify 
the caller when an
+   *                            updateOrThrow() operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked
+     * only exactly once successfully.
+     *
+     * @throws   IllegalStateException, if a non-empty notifier was provided 
in the constructor, and
+     *           this method is called again after a successful previous 
invocation.
+     *
+     *           FeatureCacheUpdateException, if there was an error in 
updating the
+     *           FinalizedFeatureCache.
+     *
+     *           RuntimeException, if there was a failure in 
reading/deserializing the
+     *           contents of the feature ZK node.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      var mayBeFeatureZNodeBytes: Option[Array[Byte]] = null
+      var version: Int = ZkVersion.UnknownVersion
+      try {
+        val result = zkClient.getDataAndVersion(featureZkNodePath)
+        mayBeFeatureZNodeBytes = result._1
+        version = result._2
+      } catch {
+        // Convert to RuntimeException, to avoid a confusion that there is no 
argument passed
+        // to the updateOrThrow() method.
+        case e: IllegalArgumentException => throw new RuntimeException(e)
+      }
+
+      // There are 4 cases:
+      //
+      // (empty dataBytes, valid version)       => The empty dataBytes will 
fail FeatureZNode deserialization.
+      //                                           FeatureZNode, when present 
in ZK, can not have empty contents.
+      // (non-empty dataBytes, valid version)   => This is a valid case, and 
should pass FeatureZNode deserialization
+      //                                           if dataBytes contains valid 
data.
+      // (empty dataBytes, unknown version)     => This is a valid case, and 
this can happen if the FeatureZNode
+      //                                           does not exist in ZK.
+      // (non-empty dataBytes, unknown version) => This case is impossible, 
since, KafkaZkClient.getDataAndVersion
+      //                                           API ensures that unknown 
version is returned only when the
+      //                                           ZK node is absent. 
Therefore dataBytes should be empty in such
+      //                                           a case.
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled 
status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {

Review comment:
       nit: space

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,232 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to notify 
the caller when an
+   *                            updateOrThrow() operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked
+     * only exactly once successfully.
+     *
+     * @throws   IllegalStateException, if a non-empty notifier was provided 
in the constructor, and
+     *           this method is called again after a successful previous 
invocation.
+     *
+     *           FeatureCacheUpdateException, if there was an error in 
updating the
+     *           FinalizedFeatureCache.
+     *
+     *           RuntimeException, if there was a failure in 
reading/deserializing the
+     *           contents of the feature ZK node.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")

Review comment:
       This comment should be frequent and the `featureZkNodePath` is staying 
constant, could we just make it for debugging level?

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,232 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to notify 
the caller when an
+   *                            updateOrThrow() operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked
+     * only exactly once successfully.
+     *
+     * @throws   IllegalStateException, if a non-empty notifier was provided 
in the constructor, and
+     *           this method is called again after a successful previous 
invocation.
+     *
+     *           FeatureCacheUpdateException, if there was an error in 
updating the
+     *           FinalizedFeatureCache.
+     *
+     *           RuntimeException, if there was a failure in 
reading/deserializing the
+     *           contents of the feature ZK node.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      var mayBeFeatureZNodeBytes: Option[Array[Byte]] = null
+      var version: Int = ZkVersion.UnknownVersion
+      try {
+        val result = zkClient.getDataAndVersion(featureZkNodePath)
+        mayBeFeatureZNodeBytes = result._1
+        version = result._2
+      } catch {
+        // Convert to RuntimeException, to avoid a confusion that there is no 
argument passed
+        // to the updateOrThrow() method.
+        case e: IllegalArgumentException => throw new RuntimeException(e)
+      }
+
+      // There are 4 cases:
+      //
+      // (empty dataBytes, valid version)       => The empty dataBytes will 
fail FeatureZNode deserialization.
+      //                                           FeatureZNode, when present 
in ZK, can not have empty contents.
+      // (non-empty dataBytes, valid version)   => This is a valid case, and 
should pass FeatureZNode deserialization
+      //                                           if dataBytes contains valid 
data.
+      // (empty dataBytes, unknown version)     => This is a valid case, and 
this can happen if the FeatureZNode
+      //                                           does not exist in ZK.
+      // (non-empty dataBytes, unknown version) => This case is impossible, 
since, KafkaZkClient.getDataAndVersion
+      //                                           API ensures that unknown 
version is returned only when the
+      //                                           ZK node is absent. 
Therefore dataBytes should be empty in such
+      //                                           a case.
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled 
status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus 
found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes 
successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has 
already completed

Review comment:
       I don't think this note is necessary, maybe just merge with the first 
line as:
   ```
   Waits until exactly one updateLatestOrThrow completes successfully.
   ```

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -442,6 +445,8 @@ object KafkaConfig {
   val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries"
   val ControlledShutdownRetryBackoffMsProp = 
"controlled.shutdown.retry.backoff.ms"
   val ControlledShutdownEnableProp = "controlled.shutdown.enable"
+  /** ********* Features configuration ***********/
+  val FeatureChangeListenerCacheUpdateWaitTimeMsProp = 
"feature.listener.cache.update.wait.ms"

Review comment:
       Do you think we should add this config as part of the KIP since it is 
public? I think it would just be a minor update, but let's wait and see others 
thoughts on this.

##########
File path: core/src/main/scala/kafka/zk/ZkData.scala
##########
@@ -744,6 +782,161 @@ object DelegationTokenInfoZNode {
   def decode(bytes: Array[Byte]): Option[TokenInformation] = 
DelegationTokenManager.fromBytes(bytes)
 }
 
+/**
+ * Represents the status of the FeatureZNode.
+ *
+ * Enabled  -> This status means the feature versioning system (KIP-584) is 
enabled, and, the
+ *             finalized features stored in the FeatureZNode are active. This 
status is written by
+ *             the controller to the FeatureZNode only when the broker IBP 
config is greater than
+ *             or equal to KAFKA_2_6_IV1.
+ *
+ * Disabled -> This status means the feature versioning system (KIP-584) is 
disabled, and, the
+ *             the finalized features stored in the FeatureZNode is not 
relevant. This status is
+ *             written by the controller to the FeatureZNode only when the 
broker IBP config
+ *             is less than KAFKA_2_6_IV1.
+ *
+ * The purpose behind the FeatureZNodeStatus is that it helps differentiates 
between the following
+ * cases:
+ *
+ * 1. New cluster bootstrap:
+ *    For a new Kafka cluster (i.e. it is deployed first time), we would like 
to start the cluster
+ *    with all the possible supported features finalized immediately. The new 
cluster will almost
+ *    never be started with an old IBP config that’s less than KAFKA_2_6_IV1. 
In such a case, the
+ *    controller will start up and notice that the FeatureZNode is absent in 
the new cluster.
+ *    To handle the requirement, the controller will create a FeatureZNode 
(with enabled status)
+ *    containing the entire list of supported features as it’s finalized 
features.

Review comment:
       s/it's/its

##########
File path: core/src/main/scala/kafka/server/SupportedFeatures.scala
##########
@@ -0,0 +1,76 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A common immutable object used in the Broker to define the latest features 
supported by the
+ * Broker. Also provides API to check for incompatibilities between the latest 
features supported
+ * by the Broker and cluster-wide finalized features.
+ *
+ * NOTE: the update() and clear() APIs of this class should be used only for 
testing purposes.
+ */
+object SupportedFeatures extends Logging {
+
+  /**
+   * This is the latest features supported by the Broker.
+   * This is currently empty, but in the future as we define supported 
features, this map should be
+   * populated.
+   */
+  @volatile private var supportedFeatures = emptySupportedFeatures
+
+  /**
+   * Returns a reference to the latest features supported by the Broker.
+   */
+  def get: Features[SupportedVersionRange] = {
+    supportedFeatures
+  }
+
+  // Should be used only for testing.
+  def update(newFeatures: Features[SupportedVersionRange]): Unit = {
+    supportedFeatures = newFeatures
+  }
+
+  // Should be used only for testing.
+  def clear(): Unit = {
+    supportedFeatures = emptySupportedFeatures
+  }
+
+  /**
+   * Returns the set of feature names found to be 'incompatible'.
+   * A feature incompatibility is a version mismatch between the latest 
feature supported by the
+   * Broker, and the provided cluster-wide finalized feature. This can happen 
because a provided
+   * cluster-wide finalized feature:
+   *  1) Does not exist in the Broker (i.e. it is unknown to the Broker).
+   *           [OR]
+   *  2) Exists but the FinalizedVersionRange does not match with the 
supported feature's SupportedVersionRange.
+   *
+   * @param finalized   The finalized features against which incompatibilities 
need to be checked for.
+   *
+   * @return            The sub-set of input features which are incompatible. 
If the returned object
+   *                    is empty, it means there were no feature 
incompatibilities found.
+   */
+  def incompatibleFeatures(finalized: Features[FinalizedVersionRange]): 
Features[FinalizedVersionRange] = {
+    val incompatibilities = finalized.features.asScala.collect {
+      case (feature, versionLevels) => {
+        val supportedVersions = supportedFeatures.get(feature);

Review comment:
       remove semi-colon

##########
File path: core/src/main/scala/kafka/zk/ZkData.scala
##########
@@ -744,6 +782,161 @@ object DelegationTokenInfoZNode {
   def decode(bytes: Array[Byte]): Option[TokenInformation] = 
DelegationTokenManager.fromBytes(bytes)
 }
 
+/**
+ * Represents the status of the FeatureZNode.
+ *
+ * Enabled  -> This status means the feature versioning system (KIP-584) is 
enabled, and, the
+ *             finalized features stored in the FeatureZNode are active. This 
status is written by
+ *             the controller to the FeatureZNode only when the broker IBP 
config is greater than
+ *             or equal to KAFKA_2_6_IV1.
+ *
+ * Disabled -> This status means the feature versioning system (KIP-584) is 
disabled, and, the
+ *             the finalized features stored in the FeatureZNode is not 
relevant. This status is
+ *             written by the controller to the FeatureZNode only when the 
broker IBP config
+ *             is less than KAFKA_2_6_IV1.
+ *
+ * The purpose behind the FeatureZNodeStatus is that it helps differentiates 
between the following
+ * cases:
+ *
+ * 1. New cluster bootstrap:
+ *    For a new Kafka cluster (i.e. it is deployed first time), we would like 
to start the cluster
+ *    with all the possible supported features finalized immediately. The new 
cluster will almost
+ *    never be started with an old IBP config that’s less than KAFKA_2_6_IV1. 
In such a case, the
+ *    controller will start up and notice that the FeatureZNode is absent in 
the new cluster.
+ *    To handle the requirement, the controller will create a FeatureZNode 
(with enabled status)
+ *    containing the entire list of supported features as it’s finalized 
features.
+ *
+ * 2. Cluster upgrade:
+ *    Imagine that a Kafka cluster exists already and the IBP config is less 
than KAFKA_2_6_IV1, but

Review comment:
       `a Kafka cluster exists already and the IBP config is less than 
KAFKA_2_6_IV1` to `an existing Kafka cluster with  IBP config less than 
KAFKA_2_6_IV1`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to