abbccdda commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496901588



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+
+/**
+ * Represents a range of version levels supported by every broker in a cluster 
for some feature.
+ */
+public class FinalizedVersionRange {

Review comment:
       Do we want to have a different name from 
`org.apache.kafka.common.feature.FinalizedVersionRange`, such as 
`FinalizedVersionLevels`? Same case for `SupportedVersionRange`, personally I 
feel the same class name makes the navigation harder.

##########
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##########
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the latest features supported by the Broker and 
also provides APIs to
+ * check for incompatibilities between the features supported by the Broker 
and finalized features.
+ * The class also enables feature version level deprecation, as explained 
below. This class is
+ * immutable in production. It provides few APIs to mutate state only for the 
purpose of testing.
+ *
+ * Feature version level deprecation:
+ * ==================================
+ *
+ * Deprecation of certain version levels of a feature is a process to stop 
supporting the
+ * functionality offered by the feature at those version levels, across the 
entire Kafka cluster.
+ * Feature version deprecation is a simple 2-step process explained below. In 
each step below, an
+ * example is provided to help understand the process better:
+ *
+ * STEP 1:
+ * =======
+ *
+ * In the first step, a major Kafka release is made with a Broker code change 
(explained later
+ * below) that establishes the intent to deprecate certain versions of one or 
more features
+ * cluster-wide. When this new Kafka release is deployed to the cluster, 
deprecated finalized
+ * feature versions are no longer advertised to the client, but they can still 
be used by existing
+ * connections. The way it works is that the feature versioning system (via 
the controller) will
+ * automatically persist the new minVersionLevel for the feature in ZK to 
propagate the deprecation
+ * of certain versions. After this happens, any external client that queries 
the Broker to learn the
+ * feature versions will at some point start to see the new value for the 
finalized minVersionLevel
+ * for the feature. The external clients are expected to stop using the 
deprecated versions at least
+ * by the time that they learn about it.
+ *
+ * Here is how the above code change needs to be done:
+ * In order to deprecate feature version levels, in the supportedFeatures map 
you need to supply a
+ * specific firstActiveVersion value that's higher than the minVersion for the 
feature. The
+ * value for firstActiveVersion should be 1 beyond the highest version that 
you intend to deprecate
+ * for that feature. Whenever the controller is elected or the features are 
finalized via the
+ * ApiKeys.UPDATE_FEATURES api, the feature version levels in the closed range:
+ * [minVersion, firstActiveVersion - 1] are automatically deprecated in ZK by 
the controller logic.
+ *
+ * Example:
+ * - Let us assume the existing finalized feature in ZK:
+ *   {
+ *      "feature" -> FinalizedVersionRange(minVersionLevel=1, 
maxVersionLevel=5)
+ *   }
+ *   Now, supposing you would like to deprecate feature version levels: [1, 2].
+ *   Then, in the supportedFeatures map you should supply the following:
+ *   supportedFeatures = {
+ *     "feature" -> SupportedVersionRange(minVersion=1, firstActiveVersion=3, 
maxVersion=5)
+ *   }
+ * - If you do NOT want to deprecate a version level for a feature, then, in 
the supportedFeatures

Review comment:
       I think we could just make the `firstActiveVersion = minVersion` by 
default, to avoid the requirement for configuring firstActiveVersion

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -20,26 +20,31 @@ package kafka.server
 import kafka.utils.Logging
 import org.apache.kafka.common.feature.{Features, FinalizedVersionRange}
 
+import scala.concurrent.TimeoutException
+import scala.math.max
+
 // Raised whenever there was an error in updating the FinalizedFeatureCache 
with features.
 class FeatureCacheUpdateException(message: String) extends 
RuntimeException(message) {
 }
 
 // Helper class that represents finalized features along with an epoch value.
-case class FinalizedFeaturesAndEpoch(features: 
Features[FinalizedVersionRange], epoch: Int) {
+case class FinalizedFeaturesAndEpoch(features: 
Features[FinalizedVersionRange], epoch: Long) {

Review comment:
       So we are saving the ZK epoch in a long, which was supposed to be an int 
field?

##########
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##########
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the latest features supported by the Broker and 
also provides APIs to
+ * check for incompatibilities between the features supported by the Broker 
and finalized features.
+ * The class also enables feature version level deprecation, as explained 
below. This class is
+ * immutable in production. It provides few APIs to mutate state only for the 
purpose of testing.
+ *
+ * Feature version level deprecation:
+ * ==================================
+ *
+ * Deprecation of certain version levels of a feature is a process to stop 
supporting the
+ * functionality offered by the feature at those version levels, across the 
entire Kafka cluster.
+ * Feature version deprecation is a simple 2-step process explained below. In 
each step below, an
+ * example is provided to help understand the process better:
+ *
+ * STEP 1:
+ * =======
+ *
+ * In the first step, a major Kafka release is made with a Broker code change 
(explained later
+ * below) that establishes the intent to deprecate certain versions of one or 
more features
+ * cluster-wide. When this new Kafka release is deployed to the cluster, 
deprecated finalized
+ * feature versions are no longer advertised to the client, but they can still 
be used by existing
+ * connections. The way it works is that the feature versioning system (via 
the controller) will
+ * automatically persist the new minVersionLevel for the feature in ZK to 
propagate the deprecation
+ * of certain versions. After this happens, any external client that queries 
the Broker to learn the
+ * feature versions will at some point start to see the new value for the 
finalized minVersionLevel
+ * for the feature. The external clients are expected to stop using the 
deprecated versions at least
+ * by the time that they learn about it.
+ *
+ * Here is how the above code change needs to be done:
+ * In order to deprecate feature version levels, in the supportedFeatures map 
you need to supply a
+ * specific firstActiveVersion value that's higher than the minVersion for the 
feature. The
+ * value for firstActiveVersion should be 1 beyond the highest version that 
you intend to deprecate
+ * for that feature. Whenever the controller is elected or the features are 
finalized via the
+ * ApiKeys.UPDATE_FEATURES api, the feature version levels in the closed range:
+ * [minVersion, firstActiveVersion - 1] are automatically deprecated in ZK by 
the controller logic.
+ *
+ * Example:
+ * - Let us assume the existing finalized feature in ZK:
+ *   {
+ *      "feature" -> FinalizedVersionRange(minVersionLevel=1, 
maxVersionLevel=5)
+ *   }
+ *   Now, supposing you would like to deprecate feature version levels: [1, 2].
+ *   Then, in the supportedFeatures map you should supply the following:
+ *   supportedFeatures = {
+ *     "feature" -> SupportedVersionRange(minVersion=1, firstActiveVersion=3, 
maxVersion=5)
+ *   }
+ * - If you do NOT want to deprecate a version level for a feature, then, in 
the supportedFeatures
+ *   map you should supply the firstActiveVersion to be the same as the 
minVersion supplied for that
+ *   feature.
+ *   Example:
+ *   supportedFeatures = {
+ *     "feature" -> SupportedVersionRange(minVersion=1, firstActiveVersion=1, 
maxVersion=5)
+ *   }
+ *   The above indicates no intent to deprecate any version levels for the 
feature.
+ *
+ * STEP 2:
+ * =======
+ *
+ * After the first step is over, you may (at some point) want to permanently 
remove the code/logic
+ * for the functionality offered by the deprecated feature versions. This is 
the second step. Here a
+ * subsequent major Kafka release is made with another Broker code change that 
removes the code for
+ * the functionality offered by the deprecated feature versions. This would 
completely drop support
+ * for the deprecated versions. Such a code change needs to be supplemented by 
supplying a
+ * suitable higher minVersion value for the feature in the supportedFeatures 
map.
+ * Example:
+ * - In the example above in step 1, we showed how to deprecate version levels 
[1, 2] for
+ *   "feature". Now let us assume the following finalized feature in ZK (after 
the deprecation
+ *   has been carried out):
+ *   {
+ *     "feature" -> FinalizedVersionRange(minVersionLevel=3, maxVersionLevel=5)
+ *   }
+ *   Now, supposing you would like to permanently remove support for feature 
versions: [1, 2].
+ *   Then, in the supportedFeatures map you should now supply the following:

Review comment:
       Similar here to make `firstActiveVersion = minVersion` as default.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to