kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463915553
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -266,6 +275,178 @@ class KafkaController(val config: KafkaConfig,
}
}
+ private def createFeatureZNode(newNode: FeatureZNode): Int = {
+ info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents:
$newNode")
+ zkClient.createFeatureZNode(newNode)
+ val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+ newVersion
+ }
+
+ private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+ info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents:
$updatedNode")
+ zkClient.updateFeatureZNode(updatedNode)
+ }
+
+ /**
+ * This method enables the feature versioning system (KIP-584).
+ *
+ * Development in Kafka (from a high level) is organized into features. Each
feature is tracked by
+ * a name and a range of version numbers. A feature can be of two types:
+ *
+ * 1. Supported feature:
+ * A supported feature is represented by a name (String) and a range of
versions (defined by a
+ * {@link SupportedVersionRange}). It refers to a feature that a particular
broker advertises
+ * support for. Each broker advertises the version ranges of its own
supported features in its
+ * own BrokerIdZNode. The contents of the advertisement are specific to the
particular broker and
+ * do not represent any guarantee of a cluster-wide availability of the
feature for any particular
+ * range of versions.
+ *
+ * 2. Finalized feature:
+ * A finalized feature is represented by a name (String) and a range of
version levels (defined
+ * by a {@link FinalizedVersionRange}). Whenever the feature versioning
system (KIP-584) is
+ * enabled, the finalized features are stored in the cluster-wide common
FeatureZNode.
+ * In comparison to a supported feature, the key difference is that a
finalized feature exists
+ * in ZK only when it is guaranteed to be supported by any random broker in
the cluster for a
+ * specified range of version levels. Also, the controller is the only
entity modifying the
+ * information about finalized features.
+ *
+ * This method sets up the FeatureZNode with enabled status, which means
that the finalized
+ * features stored in the FeatureZNode are active. The enabled status should
be written by the
+ * controller to the FeatureZNode only when the broker IBP config is greater
than or equal to
+ * KAFKA_2_7_IV0.
+ *
+ * There are multiple cases handled here:
+ *
+ * 1. New cluster bootstrap:
+ * A new Kafka cluster (i.e. it is deployed first time) is almost always
started with IBP config
+ * setting greater than or equal to KAFKA_2_7_IV0. We would like to start
the cluster with all
+ * the possible supported features finalized immediately. Assuming this
is the case, the
+ * controller will start up and notice that the FeatureZNode is absent in
the new cluster,
+ * it will then create a FeatureZNode (with enabled status) containing
the entire list of
+ * default supported features as its finalized features.
+ *
+ * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+ * Imagine there is an existing Kafka cluster with IBP config less than
KAFKA_2_7_IV0, and the
+ * broker binary has been upgraded to a newer version that supports the
feature versioning
+ * system (KIP-584). This means the user is upgrading from an earlier
version of the broker
+ * binary. In this case, we want to start with no finalized features and
allow the user to
+ * finalize them whenever they are ready i.e. in the future whenever the
user sets IBP config
+ * to be greater than or equal to KAFKA_2_7_IV0, then the user could
start finalizing the
+ * features. This process ensures we do not enable all the possible
features immediately after
+ * an upgrade, which could be harmful to Kafka.
+ * This is how we handle such a case:
+ * - Before the IBP config upgrade (i.e. IBP config set to less than
KAFKA_2_7_IV0), the
+ * controller will start up and check if the FeatureZNode is absent.
If absent, it will
+ * react by creating a FeatureZNode with disabled status and empty
finalized features.
+ * Otherwise, if a node already exists in enabled status then the
controller will just
+ * flip the status to disabled and clear the finalized features.
+ * - After the IBP config upgrade (i.e. IBP config set to greater than
or equal to
+ * KAFKA_2_7_IV0), when the controller starts up it will check if the
FeatureZNode exists
+ * and whether it is disabled. In such a case, it won’t upgrade all
features immediately.
+ * Instead it will just switch the FeatureZNode status to enabled
status. This lets the
+ * user finalize the features later.
+ *
+ * 3. Broker binary upgraded, with existing cluster IBP config >=
KAFKA_2_7_IV0:
+ * Imagine an existing Kafka cluster with IBP config >= KAFKA_2_7_IV0,
and the broker binary
+ * has just been upgraded to a newer version (that supports IBP config
KAFKA_2_7_IV0 and higher).
+ * The controller will start up and find that a FeatureZNode is already
present with enabled
+ * status and existing finalized features. In such a case, the controller
needs to scan the
+ * existing finalized features and mutate them for the purpose of version
level deprecation
+ * (if needed).
+ * This is how we handle this case: If an existing finalized feature is
present in the default
+ * finalized features, then, its existing minimum version level is
updated to the default
+ * minimum version level maintained in the BrokerFeatures object. The
goal of this mutation is
+ * to permanently deprecate one or more feature version levels. The range
of feature version
+ * levels deprecated are from the closed range:
[existing_min_version_level, default_min_version_level].
+ * NOTE: Deprecating a feature version level is an incompatible change,
which requires a major
+ * release of Kafka. In such a release, the minimum version level
maintained within the
+ * BrokerFeatures class is updated suitably to record the deprecation of
the feature.
+ *
+ * 4. Broker downgrade:
+ * Imagine that a Kafka cluster exists already and the IBP config is
greater than or equal to
+ * KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by
setting IBP config to a
+ * value less than KAFKA_2_7_IV0. This means the user is also disabling
the feature versioning
+ * system (KIP-584). In this case, when the controller starts up with the
lower IBP config, it
+ * will switch the FeatureZNode status to disabled with empty features.
+ */
+ private def enableFeatureVersioning(): Unit = {
+ val defaultFinalizedFeatures = brokerFeatures.getDefaultFinalizedFeatures
+ val (mayBeFeatureZNodeBytes, version) =
zkClient.getDataAndVersion(FeatureZNode.path)
+ if (version == ZkVersion.UnknownVersion) {
+ val newVersion = createFeatureZNode(new
FeatureZNode(FeatureZNodeStatus.Enabled, defaultFinalizedFeatures))
+ featureCache.waitUntilEpochOrThrow(newVersion,
config.zkConnectionTimeoutMs)
+ } else {
+ val existingFeatureZNode =
FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+ var newFeatures: Features[FinalizedVersionRange] =
Features.emptyFinalizedFeatures()
+ if (existingFeatureZNode.status.equals(FeatureZNodeStatus.Enabled)) {
+ newFeatures =
Features.finalizedFeatures(existingFeatureZNode.features.features().asScala.map
{
+ case (featureName, existingVersionRange) =>
+ val brokerDefaultVersionRange =
defaultFinalizedFeatures.get(featureName)
+ if (brokerDefaultVersionRange == null) {
+ warn(s"Existing finalized feature: $featureName with
$existingVersionRange"
+ + s" is absent in default finalized $defaultFinalizedFeatures")
+ (featureName, existingVersionRange)
+ } else if (brokerDefaultVersionRange.max() >=
existingVersionRange.max() &&
+ brokerDefaultVersionRange.min() <=
existingVersionRange.max()) {
+ // Through this change, we deprecate all version levels in the
closed range:
+ // [existingVersionRange.min(), brokerDefaultVersionRange.min()
- 1]
+ (featureName, new
FinalizedVersionRange(brokerDefaultVersionRange.min(),
existingVersionRange.max()))
+ } else {
+ // If the existing version levels fall completely outside the
+ // range of the default finalized version levels (i.e. no
intersection), or, if the
+ // existing version levels are ineligible for a modification
since they are
+ // incompatible with default finalized version levels, then we
skip the update.
+ warn(s"Can not update minimum version level in finalized
feature: $featureName,"
+ + s" since the existing $existingVersionRange is not eligible
for a change"
+ + s" based on the default $brokerDefaultVersionRange.")
+ (featureName, existingVersionRange)
+ }
+ }.asJava)
+ }
+ val newFeatureZNode = new FeatureZNode(FeatureZNodeStatus.Enabled,
newFeatures)
+ if (!newFeatureZNode.equals(existingFeatureZNode)) {
+ val newVersion = updateFeatureZNode(newFeatureZNode)
+ featureCache.waitUntilEpochOrThrow(newVersion,
config.zkConnectionTimeoutMs)
+ }
+ }
+ }
+
+ /**
+ * Disables the feature versioning system (KIP-584).
+ *
+ * Sets up the FeatureZNode with disabled status. This status means the
feature versioning system
+ * (KIP-584) is disabled, and, the finalized features stored in the
FeatureZNode are not relevant.
+ * This status should be written by the controller to the FeatureZNode only
when the broker
+ * IBP config is less than KAFKA_2_7_IV0.
+ *
+ * NOTE:
+ * 1. When this method returns, existing finalized features (if any) will be
cleared from the
+ * FeatureZNode.
+ * 2. This method, unlike enableFeatureVersioning() need not wait for the
FinalizedFeatureCache
+ * to be updated, because, such updates to the cache (via
FinalizedFeatureChangeListener)
+ * are disabled when IBP config is < than KAFKA_2_7_IV0.
+ */
+ private def disableFeatureVersioning(): Unit = {
+ val newNode = FeatureZNode(FeatureZNodeStatus.Disabled,
Features.emptyFinalizedFeatures())
+ val (mayBeFeatureZNodeBytes, version) =
zkClient.getDataAndVersion(FeatureZNode.path)
+ if (version == ZkVersion.UnknownVersion) {
+ createFeatureZNode(newNode)
Review comment:
No, that is not required. Please refer to the documentation under `NOTE`
for this method where I have explained why.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]